mirror of https://github.com/certusone/dc4bc.git
cli updates
This commit is contained in:
parent
2ecb9ba7c3
commit
892c22ebd8
|
@ -17,16 +17,17 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
flagUserName = "username"
|
||||
flagListenAddr = "listen_addr"
|
||||
flagStateDBDSN = "state_dbdsn"
|
||||
flagStorageDBDSN = "storage_dbdsn"
|
||||
flagStorageTopic = "storage_topic"
|
||||
flagKafkaProducerCredentials = "producer_credentials"
|
||||
flagKafkaConsumerCredentials = "consumer_credentials"
|
||||
flagStoreDBDSN = "key_store_dbdsn"
|
||||
flagFramesDelay = "frames_delay"
|
||||
flagChunkSize = "chunk_size"
|
||||
flagUserName = "username"
|
||||
flagListenAddr = "listen_addr"
|
||||
flagStateDBDSN = "state_dbdsn"
|
||||
flagStorageDBDSN = "storage_dbdsn"
|
||||
flagStorageTopic = "storage_topic"
|
||||
flagKafkaKeyStorePath = "kafka_keystore_path"
|
||||
flagKafkaTrustStorePath = "kafka_truststore_path"
|
||||
flagKafkaKeyStorePassword = "kafka_keystore_password"
|
||||
flagStoreDBDSN = "key_store_dbdsn"
|
||||
flagFramesDelay = "frames_delay"
|
||||
flagChunkSize = "chunk_size"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -35,8 +36,9 @@ func init() {
|
|||
rootCmd.PersistentFlags().String(flagStateDBDSN, "./dc4bc_client_state", "State DBDSN")
|
||||
rootCmd.PersistentFlags().String(flagStorageDBDSN, "./dc4bc_file_storage", "Storage DBDSN")
|
||||
rootCmd.PersistentFlags().String(flagStorageTopic, "messages", "Storage Topic (Kafka)")
|
||||
rootCmd.PersistentFlags().String(flagKafkaProducerCredentials, "producer:producerpass", "Producer credentials for Kafka: username:password")
|
||||
rootCmd.PersistentFlags().String(flagKafkaConsumerCredentials, "consumer:consumerpass", "Consumer credentials for Kafka: username:password")
|
||||
rootCmd.PersistentFlags().String(flagKafkaKeyStorePath, "certs/client.p12", "Path to kafka keystore")
|
||||
rootCmd.PersistentFlags().String(flagKafkaTrustStorePath, "certs/ca.pem", "Path to kafka truststore")
|
||||
rootCmd.PersistentFlags().String(flagKafkaKeyStorePassword, "test1234", "Kafka keystore's password")
|
||||
rootCmd.PersistentFlags().String(flagStoreDBDSN, "./dc4bc_key_store", "Key Store DBDSN")
|
||||
rootCmd.PersistentFlags().Int(flagFramesDelay, 10, "Delay times between frames in 100ths of a second")
|
||||
rootCmd.PersistentFlags().Int(flagChunkSize, 256, "QR-code's chunk size")
|
||||
|
@ -120,24 +122,6 @@ func startClientCommand() *cobra.Command {
|
|||
log.Fatalf("failed to read configuration: %v", err)
|
||||
}
|
||||
|
||||
producerCredentialsString, err := cmd.Flags().GetString(flagKafkaProducerCredentials)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to read configuration: %v", err)
|
||||
}
|
||||
producerCreds, err := parseKafkaAuthCredentials(producerCredentialsString)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
consumerCredentialsString, err := cmd.Flags().GetString(flagKafkaConsumerCredentials)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to read configuration: %v", err)
|
||||
}
|
||||
consumerCreds, err := parseKafkaAuthCredentials(consumerCredentialsString)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
keyStoreDBDSN, err := cmd.Flags().GetString(flagStoreDBDSN)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to read configuration: %v", err)
|
||||
|
@ -151,7 +135,23 @@ func startClientCommand() *cobra.Command {
|
|||
log.Fatalf("Failed to init state client: %v", err)
|
||||
}
|
||||
|
||||
stg, err := storage.NewKafkaStorage(ctx, storageDBDSN, storageTopic, producerCreds, consumerCreds)
|
||||
kafkaKeyStorePath, err := cmd.Flags().GetString(flagKafkaKeyStorePath)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to read configuration: %v", err)
|
||||
}
|
||||
kafkaTrustStorePath, err := cmd.Flags().GetString(flagKafkaTrustStorePath)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to read configuration: %v", err)
|
||||
}
|
||||
kafkaKeyStorePassword, err := cmd.Flags().GetString(flagKafkaKeyStorePassword)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to read configuration: %v", err)
|
||||
}
|
||||
tlsConfig, err := storage.GetTLSConfig(kafkaKeyStorePath, kafkaTrustStorePath, kafkaKeyStorePassword)
|
||||
if err != nil {
|
||||
log.Fatalf("faile to create tls config: %v", err)
|
||||
}
|
||||
stg, err := storage.NewKafkaStorage(ctx, storageDBDSN, storageTopic, tlsConfig)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to init storage client: %v", err)
|
||||
}
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
FROM centos:centos7
|
||||
MAINTAINER d.gasparina@gmail.com
|
||||
ENV container docker
|
||||
|
||||
# 1. Adding Confluent repository
|
||||
|
|
|
@ -9,7 +9,7 @@ req_extensions = v3_req
|
|||
countryName = UK
|
||||
organizationName = Confluent
|
||||
localityName = London
|
||||
commonName=Kiril-Piskunov.local
|
||||
commonName=pr0n00gler.local
|
||||
|
||||
[ v3_ca ]
|
||||
subjectKeyIdentifier=hash
|
||||
|
@ -26,4 +26,4 @@ extendedKeyUsage = clientAuth
|
|||
subjectAltName = @alt_names
|
||||
|
||||
[ alt_names ]
|
||||
DNS.1=Kiril-Piskunov.local
|
||||
DNS.1=pr0n00gler.local
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
FROM centos:centos7
|
||||
MAINTAINER d.gasparina@gmail.com
|
||||
ENV container docker
|
||||
|
||||
# 1. Adding Confluent repository
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
FROM centos:centos7
|
||||
MAINTAINER d.gasparina@gmail.com
|
||||
ENV container docker
|
||||
|
||||
# 1. Adding Confluent repository
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"fmt"
|
||||
"golang.org/x/crypto/pkcs12"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/segmentio/kafka-go"
|
||||
|
@ -29,13 +28,16 @@ type KafkaAuthCredentials struct {
|
|||
Password string
|
||||
}
|
||||
|
||||
func tlsConfig() *tls.Config {
|
||||
func GetTLSConfig(keyStorePath, trustStorePath, password string) (*tls.Config, error) {
|
||||
|
||||
// Keystore
|
||||
keys, _ := ioutil.ReadFile("../kafka-docker/certs/client.p12")
|
||||
blocks, err := pkcs12.ToPEM(keys, "test1234")
|
||||
keys, err := ioutil.ReadFile(keyStorePath)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
return nil, fmt.Errorf("failed to read keyStorePath: %w", err)
|
||||
}
|
||||
blocks, err := pkcs12.ToPEM(keys, password)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to convert PKCS12 key to PEM: %w", err)
|
||||
}
|
||||
|
||||
var pemData []byte
|
||||
|
@ -46,12 +48,12 @@ func tlsConfig() *tls.Config {
|
|||
|
||||
cert, err := tls.X509KeyPair(pemData, pemData)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
return nil, fmt.Errorf("failed to get X509KeyPair: %w", err)
|
||||
}
|
||||
//Truststore
|
||||
caCert, err := ioutil.ReadFile("../kafka-docker/certs/ca.pem")
|
||||
caCert, err := ioutil.ReadFile(trustStorePath)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
return nil, fmt.Errorf("failed to read trustStorePath: %w", err)
|
||||
}
|
||||
|
||||
caCertPool := x509.NewCertPool()
|
||||
|
@ -61,20 +63,20 @@ func tlsConfig() *tls.Config {
|
|||
Certificates: []tls.Certificate{cert},
|
||||
RootCAs: caCertPool,
|
||||
}
|
||||
return config
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func NewKafkaStorage(ctx context.Context, kafkaEndpoint string, kafkaTopic string, producerCreds, ConsumerCreds *KafkaAuthCredentials) (Storage, error) {
|
||||
func NewKafkaStorage(ctx context.Context, kafkaEndpoint string, kafkaTopic string, tlsConfig *tls.Config) (Storage, error) {
|
||||
|
||||
dialerProducer := &kafka.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
DualStack: true,
|
||||
TLS: tlsConfig(),
|
||||
TLS: tlsConfig,
|
||||
}
|
||||
dialerConsumer := &kafka.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
DualStack: true,
|
||||
TLS: tlsConfig(),
|
||||
TLS: tlsConfig,
|
||||
}
|
||||
conn, err := dialerProducer.DialLeader(ctx, "tcp", kafkaEndpoint, kafkaTopic, kafkaPartition)
|
||||
if err != nil {
|
||||
|
|
|
@ -16,16 +16,13 @@ func TestKafkaStorage_GetMessages(t *testing.T) {
|
|||
N := 10
|
||||
var offset uint64 = 5
|
||||
|
||||
producerCreds := &KafkaAuthCredentials{
|
||||
Username: "producer",
|
||||
Password: "producerpass",
|
||||
}
|
||||
consumerCreds := &KafkaAuthCredentials{
|
||||
Username: "consumer",
|
||||
Password: "consumerpass",
|
||||
tlsConfig, err := GetTLSConfig("../kafka-docker/certs/client.p12", "../kafka-docker/certs/ca.pem", "test1234")
|
||||
if err != nil {
|
||||
t.Fatal(err.Error())
|
||||
}
|
||||
|
||||
req := require.New(t)
|
||||
stg, err := NewKafkaStorage(context.Background(), "localhost:9093", "test", producerCreds, consumerCreds)
|
||||
stg, err := NewKafkaStorage(context.Background(), "localhost:9093", "test", tlsConfig)
|
||||
req.NoError(err)
|
||||
|
||||
msgs := make([]Message, 0, N)
|
||||
|
|
Loading…
Reference in New Issue