feat: added topics configuration

This commit is contained in:
Andrej Zavgorodnij 2020-10-05 20:07:02 +03:00
parent 0e8a9b11b3
commit acda112838
5 changed files with 15 additions and 5 deletions

View File

@ -20,6 +20,7 @@ const (
flagListenAddr = "listen_addr"
flagStateDBDSN = "state_dbdsn"
flagStorageDBDSN = "storage_dbdsn"
flagStorageTopic = "storage_topic"
flagStoreDBDSN = "key_store_dbdsn"
flagFramesDelay = "frames_delay"
flagChunkSize = "chunk_size"
@ -30,6 +31,7 @@ func init() {
rootCmd.PersistentFlags().String(flagListenAddr, "localhost:8080", "Listen Address")
rootCmd.PersistentFlags().String(flagStateDBDSN, "./dc4bc_client_state", "State DBDSN")
rootCmd.PersistentFlags().String(flagStorageDBDSN, "./dc4bc_file_storage", "Storage DBDSN")
rootCmd.PersistentFlags().String(flagStorageTopic, "messages", "Storage Topic (Kafka)")
rootCmd.PersistentFlags().String(flagStoreDBDSN, "./dc4bc_key_store", "Key Store DBDSN")
rootCmd.PersistentFlags().Int(flagFramesDelay, 10, "Delay times between frames in 100ths of a second")
rootCmd.PersistentFlags().Int(flagChunkSize, 256, "QR-code's chunk size")
@ -97,6 +99,11 @@ func startClientCommand() *cobra.Command {
log.Fatalf("failed to read configuration: %v", err)
}
storageTopic, err := cmd.Flags().GetString(flagStorageTopic)
if err != nil {
log.Fatalf("failed to read configuration: %v", err)
}
keyStoreDBDSN, err := cmd.Flags().GetString(flagStoreDBDSN)
if err != nil {
log.Fatalf("failed to read configuration: %v", err)
@ -110,7 +117,7 @@ func startClientCommand() *cobra.Command {
log.Fatalf("Failed to init state client: %v", err)
}
stg, err := storage.NewKafkaStorage(ctx, storageDBDSN)
stg, err := storage.NewKafkaStorage(ctx, storageDBDSN, storageTopic)
if err != nil {
log.Fatalf("Failed to init storage client: %v", err)
}

2
go.mod
View File

@ -3,7 +3,7 @@ module github.com/depools/dc4bc
go 1.13
require (
github.com/corestario/kyber v1.4.0
github.com/corestario/kyber v1.5.0
github.com/depools/kyber-bls12381 v0.0.0-20200929134032-c24859b7d890
github.com/golang/mock v1.4.4
github.com/google/go-cmp v0.2.0

4
go.sum
View File

@ -18,6 +18,8 @@ github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfc
github.com/corestario/kyber v1.3.0/go.mod h1:kIWfWekm8kSJNti3Fo3DCV0GHEH050MWQrdvZdefbkk=
github.com/corestario/kyber v1.4.0 h1:jSB8P5vBvRDiFESJHxlx9BzH1+E1FDQSuu7xfiCy3HY=
github.com/corestario/kyber v1.4.0/go.mod h1:kIWfWekm8kSJNti3Fo3DCV0GHEH050MWQrdvZdefbkk=
github.com/corestario/kyber v1.5.0 h1:wNkoKD6yYAJV8p8JmJYF0jdJzCx4LlDJQT6wobWPl+I=
github.com/corestario/kyber v1.5.0/go.mod h1:mzxQ0SX6j2O1bH1EbCDcXxnEZx2pDskatkkSaINGKVA=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
@ -63,6 +65,8 @@ github.com/juju/fslock v0.0.0-20160525022230-4d5c94c67b4b/go.mod h1:HMcgvsgd0Fjj
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/kilic/bls12-381 v0.0.0-20200607163746-32e1441c8a9f h1:qET3Wx0v8tMtoTOQnsJXVvqvCopSf48qobR6tcJuDHo=
github.com/kilic/bls12-381 v0.0.0-20200607163746-32e1441c8a9f/go.mod h1:XXfR6YFCRSrkEXbNlIyDsgXVNJWVUV30m/ebkVy9n6s=
github.com/kilic/bls12-381 v0.0.0-20200820230200-6b2c19996391 h1:51kHw7l/dUDdOdW06AlUGT5jnpj6nqQSILebcsikSjA=
github.com/kilic/bls12-381 v0.0.0-20200820230200-6b2c19996391/go.mod h1:XXfR6YFCRSrkEXbNlIyDsgXVNJWVUV30m/ebkVy9n6s=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=

View File

@ -10,7 +10,6 @@ import (
)
const (
kafkaTopic = "messages"
kafkaPartition = 0
)
@ -19,7 +18,7 @@ type KafkaStorage struct {
reader *kafka.Reader
}
func NewKafkaStorage(ctx context.Context, kafkaEndpoint string) (Storage, error) {
func NewKafkaStorage(ctx context.Context, kafkaEndpoint string, kafkaTopic string) (Storage, error) {
conn, err := kafka.DialLeader(ctx, "tcp", kafkaEndpoint, kafkaTopic, kafkaPartition)
if err != nil {
return nil, fmt.Errorf("failed to init Kafka client: %w", err)

View File

@ -17,7 +17,7 @@ func TestKafkaStorage_GetMessages(t *testing.T) {
var offset uint64 = 5
req := require.New(t)
stg, err := NewKafkaStorage(context.Background(), "localhost:9092")
stg, err := NewKafkaStorage(context.Background(), "localhost:9092", "test_topic")
req.NoError(err)
msgs := make([]Message, 0, N)