diff --git a/cmd/dc4bc_d/main.go b/cmd/dc4bc_d/main.go index 48863dd..3e0ecf1 100644 --- a/cmd/dc4bc_d/main.go +++ b/cmd/dc4bc_d/main.go @@ -6,6 +6,7 @@ import ( "log" "os" "os/signal" + "strings" "syscall" "github.com/depools/dc4bc/client" @@ -16,16 +17,16 @@ import ( ) const ( - flagUserName = "username" - flagListenAddr = "listen_addr" - flagStateDBDSN = "state_dbdsn" - flagStorageDBDSN = "storage_dbdsn" - flagStorageTopic = "storage_topic" - flagKafkaProducerPass = "producer_pass" - flagKafkaConsumerPass = "consumer_pass" - flagStoreDBDSN = "key_store_dbdsn" - flagFramesDelay = "frames_delay" - flagChunkSize = "chunk_size" + flagUserName = "username" + flagListenAddr = "listen_addr" + flagStateDBDSN = "state_dbdsn" + flagStorageDBDSN = "storage_dbdsn" + flagStorageTopic = "storage_topic" + flagKafkaProducerCredentials = "producer_credentials" + flagKafkaConsumerCredentials = "consumer_credentials" + flagStoreDBDSN = "key_store_dbdsn" + flagFramesDelay = "frames_delay" + flagChunkSize = "chunk_size" ) func init() { @@ -34,8 +35,8 @@ func init() { rootCmd.PersistentFlags().String(flagStateDBDSN, "./dc4bc_client_state", "State DBDSN") rootCmd.PersistentFlags().String(flagStorageDBDSN, "./dc4bc_file_storage", "Storage DBDSN") rootCmd.PersistentFlags().String(flagStorageTopic, "messages", "Storage Topic (Kafka)") - rootCmd.PersistentFlags().String(flagKafkaProducerPass, "producerpass", "Producer password for Kafka") - rootCmd.PersistentFlags().String(flagKafkaConsumerPass, "consumerpass", "Consumer password for Kafka") + rootCmd.PersistentFlags().String(flagKafkaProducerCredentials, "producer:producerpass", "Producer credentials for Kafka: username:password") + rootCmd.PersistentFlags().String(flagKafkaConsumerCredentials, "consumer:consumerpass", "Consumer credentials for Kafka: username:password") rootCmd.PersistentFlags().String(flagStoreDBDSN, "./dc4bc_key_store", "Key Store DBDSN") rootCmd.PersistentFlags().Int(flagFramesDelay, 10, "Delay times between frames in 100ths of a second") rootCmd.PersistentFlags().Int(flagChunkSize, 256, "QR-code's chunk size") @@ -68,6 +69,17 @@ func genKeyPairCommand() *cobra.Command { } } +func parseKafkaAuthCredentials(creds string) (*storage.KafkaAuthCredentials, error) { + credsSplited := strings.SplitN(creds, ":", 2) + if len(credsSplited) == 1 { + return nil, fmt.Errorf("failed to parse credentials") + } + return &storage.KafkaAuthCredentials{ + Username: credsSplited[0], + Password: credsSplited[1], + }, nil +} + func startClientCommand() *cobra.Command { return &cobra.Command{ Use: "start", @@ -108,15 +120,23 @@ func startClientCommand() *cobra.Command { log.Fatalf("failed to read configuration: %v", err) } - producerPass, err := cmd.Flags().GetString(flagKafkaProducerPass) + producerCredentialsString, err := cmd.Flags().GetString(flagKafkaProducerCredentials) if err != nil { log.Fatalf("failed to read configuration: %v", err) } + producerCreds, err := parseKafkaAuthCredentials(producerCredentialsString) + if err != nil { + log.Fatal(err.Error()) + } - consumerPass, err := cmd.Flags().GetString(flagKafkaConsumerPass) + consumerCredentialsString, err := cmd.Flags().GetString(flagKafkaConsumerCredentials) if err != nil { log.Fatalf("failed to read configuration: %v", err) } + consumerCreds, err := parseKafkaAuthCredentials(consumerCredentialsString) + if err != nil { + log.Fatal(err.Error()) + } keyStoreDBDSN, err := cmd.Flags().GetString(flagStoreDBDSN) if err != nil { @@ -131,7 +151,7 @@ func startClientCommand() *cobra.Command { log.Fatalf("Failed to init state client: %v", err) } - stg, err := storage.NewKafkaStorage(ctx, storageDBDSN, storageTopic, producerPass, consumerPass) + stg, err := storage.NewKafkaStorage(ctx, storageDBDSN, storageTopic, producerCreds, consumerCreds) if err != nil { log.Fatalf("Failed to init storage client: %v", err) } diff --git a/storage/kafkaStorage.go b/storage/kafkaStorage.go index 5651849..6722e8d 100644 --- a/storage/kafkaStorage.go +++ b/storage/kafkaStorage.go @@ -19,9 +19,14 @@ type KafkaStorage struct { reader *kafka.Reader } -func NewKafkaStorage(ctx context.Context, kafkaEndpoint string, kafkaTopic string, producerPass, consumerPass string) (Storage, error) { - mechanismProducer := plain.Mechanism{"producer", producerPass} - mechanismConsumer := plain.Mechanism{"consumer", consumerPass} +type KafkaAuthCredentials struct { + Username string + Password string +} + +func NewKafkaStorage(ctx context.Context, kafkaEndpoint string, kafkaTopic string, producerCreds, ConsumerCreds *KafkaAuthCredentials) (Storage, error) { + mechanismProducer := plain.Mechanism{producerCreds.Username, producerCreds.Password} + mechanismConsumer := plain.Mechanism{ConsumerCreds.Username, ConsumerCreds.Password} dialerProducer := &kafka.Dialer{ Timeout: 10 * time.Second, diff --git a/storage/kafkaStorage_test.go b/storage/kafkaStorage_test.go index 7ad3d9a..542153a 100644 --- a/storage/kafkaStorage_test.go +++ b/storage/kafkaStorage_test.go @@ -16,8 +16,16 @@ func TestKafkaStorage_GetMessages(t *testing.T) { N := 10 var offset uint64 = 5 + producerCreds := &KafkaAuthCredentials{ + Username: "producer", + Password: "producerpass", + } + consumerCreds := &KafkaAuthCredentials{ + Username: "consumer", + Password: "consumerpass", + } req := require.New(t) - stg, err := NewKafkaStorage(context.Background(), "localhost:9093", "test_topic", "producerpass", "consumerpass") + stg, err := NewKafkaStorage(context.Background(), "localhost:9093", "test_topic", producerCreds, consumerCreds) req.NoError(err) msgs := make([]Message, 0, N)