mirror of https://github.com/certusone/dc4bc.git
producer and consumer creds as structs
This commit is contained in:
parent
613f910df5
commit
d4e1ddfdfa
|
@ -6,6 +6,7 @@ import (
|
|||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"syscall"
|
||||
|
||||
"github.com/depools/dc4bc/client"
|
||||
|
@ -16,16 +17,16 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
flagUserName = "username"
|
||||
flagListenAddr = "listen_addr"
|
||||
flagStateDBDSN = "state_dbdsn"
|
||||
flagStorageDBDSN = "storage_dbdsn"
|
||||
flagStorageTopic = "storage_topic"
|
||||
flagKafkaProducerPass = "producer_pass"
|
||||
flagKafkaConsumerPass = "consumer_pass"
|
||||
flagStoreDBDSN = "key_store_dbdsn"
|
||||
flagFramesDelay = "frames_delay"
|
||||
flagChunkSize = "chunk_size"
|
||||
flagUserName = "username"
|
||||
flagListenAddr = "listen_addr"
|
||||
flagStateDBDSN = "state_dbdsn"
|
||||
flagStorageDBDSN = "storage_dbdsn"
|
||||
flagStorageTopic = "storage_topic"
|
||||
flagKafkaProducerCredentials = "producer_credentials"
|
||||
flagKafkaConsumerCredentials = "consumer_credentials"
|
||||
flagStoreDBDSN = "key_store_dbdsn"
|
||||
flagFramesDelay = "frames_delay"
|
||||
flagChunkSize = "chunk_size"
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -34,8 +35,8 @@ func init() {
|
|||
rootCmd.PersistentFlags().String(flagStateDBDSN, "./dc4bc_client_state", "State DBDSN")
|
||||
rootCmd.PersistentFlags().String(flagStorageDBDSN, "./dc4bc_file_storage", "Storage DBDSN")
|
||||
rootCmd.PersistentFlags().String(flagStorageTopic, "messages", "Storage Topic (Kafka)")
|
||||
rootCmd.PersistentFlags().String(flagKafkaProducerPass, "producerpass", "Producer password for Kafka")
|
||||
rootCmd.PersistentFlags().String(flagKafkaConsumerPass, "consumerpass", "Consumer password for Kafka")
|
||||
rootCmd.PersistentFlags().String(flagKafkaProducerCredentials, "producer:producerpass", "Producer credentials for Kafka: username:password")
|
||||
rootCmd.PersistentFlags().String(flagKafkaConsumerCredentials, "consumer:consumerpass", "Consumer credentials for Kafka: username:password")
|
||||
rootCmd.PersistentFlags().String(flagStoreDBDSN, "./dc4bc_key_store", "Key Store DBDSN")
|
||||
rootCmd.PersistentFlags().Int(flagFramesDelay, 10, "Delay times between frames in 100ths of a second")
|
||||
rootCmd.PersistentFlags().Int(flagChunkSize, 256, "QR-code's chunk size")
|
||||
|
@ -68,6 +69,17 @@ func genKeyPairCommand() *cobra.Command {
|
|||
}
|
||||
}
|
||||
|
||||
func parseKafkaAuthCredentials(creds string) (*storage.KafkaAuthCredentials, error) {
|
||||
credsSplited := strings.SplitN(creds, ":", 2)
|
||||
if len(credsSplited) == 1 {
|
||||
return nil, fmt.Errorf("failed to parse credentials")
|
||||
}
|
||||
return &storage.KafkaAuthCredentials{
|
||||
Username: credsSplited[0],
|
||||
Password: credsSplited[1],
|
||||
}, nil
|
||||
}
|
||||
|
||||
func startClientCommand() *cobra.Command {
|
||||
return &cobra.Command{
|
||||
Use: "start",
|
||||
|
@ -108,15 +120,23 @@ func startClientCommand() *cobra.Command {
|
|||
log.Fatalf("failed to read configuration: %v", err)
|
||||
}
|
||||
|
||||
producerPass, err := cmd.Flags().GetString(flagKafkaProducerPass)
|
||||
producerCredentialsString, err := cmd.Flags().GetString(flagKafkaProducerCredentials)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to read configuration: %v", err)
|
||||
}
|
||||
producerCreds, err := parseKafkaAuthCredentials(producerCredentialsString)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
consumerPass, err := cmd.Flags().GetString(flagKafkaConsumerPass)
|
||||
consumerCredentialsString, err := cmd.Flags().GetString(flagKafkaConsumerCredentials)
|
||||
if err != nil {
|
||||
log.Fatalf("failed to read configuration: %v", err)
|
||||
}
|
||||
consumerCreds, err := parseKafkaAuthCredentials(consumerCredentialsString)
|
||||
if err != nil {
|
||||
log.Fatal(err.Error())
|
||||
}
|
||||
|
||||
keyStoreDBDSN, err := cmd.Flags().GetString(flagStoreDBDSN)
|
||||
if err != nil {
|
||||
|
@ -131,7 +151,7 @@ func startClientCommand() *cobra.Command {
|
|||
log.Fatalf("Failed to init state client: %v", err)
|
||||
}
|
||||
|
||||
stg, err := storage.NewKafkaStorage(ctx, storageDBDSN, storageTopic, producerPass, consumerPass)
|
||||
stg, err := storage.NewKafkaStorage(ctx, storageDBDSN, storageTopic, producerCreds, consumerCreds)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to init storage client: %v", err)
|
||||
}
|
||||
|
|
|
@ -19,9 +19,14 @@ type KafkaStorage struct {
|
|||
reader *kafka.Reader
|
||||
}
|
||||
|
||||
func NewKafkaStorage(ctx context.Context, kafkaEndpoint string, kafkaTopic string, producerPass, consumerPass string) (Storage, error) {
|
||||
mechanismProducer := plain.Mechanism{"producer", producerPass}
|
||||
mechanismConsumer := plain.Mechanism{"consumer", consumerPass}
|
||||
type KafkaAuthCredentials struct {
|
||||
Username string
|
||||
Password string
|
||||
}
|
||||
|
||||
func NewKafkaStorage(ctx context.Context, kafkaEndpoint string, kafkaTopic string, producerCreds, ConsumerCreds *KafkaAuthCredentials) (Storage, error) {
|
||||
mechanismProducer := plain.Mechanism{producerCreds.Username, producerCreds.Password}
|
||||
mechanismConsumer := plain.Mechanism{ConsumerCreds.Username, ConsumerCreds.Password}
|
||||
|
||||
dialerProducer := &kafka.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
|
|
|
@ -16,8 +16,16 @@ func TestKafkaStorage_GetMessages(t *testing.T) {
|
|||
N := 10
|
||||
var offset uint64 = 5
|
||||
|
||||
producerCreds := &KafkaAuthCredentials{
|
||||
Username: "producer",
|
||||
Password: "producerpass",
|
||||
}
|
||||
consumerCreds := &KafkaAuthCredentials{
|
||||
Username: "consumer",
|
||||
Password: "consumerpass",
|
||||
}
|
||||
req := require.New(t)
|
||||
stg, err := NewKafkaStorage(context.Background(), "localhost:9093", "test_topic", "producerpass", "consumerpass")
|
||||
stg, err := NewKafkaStorage(context.Background(), "localhost:9093", "test_topic", producerCreds, consumerCreds)
|
||||
req.NoError(err)
|
||||
|
||||
msgs := make([]Message, 0, N)
|
||||
|
|
Loading…
Reference in New Issue