storage_type

This commit is contained in:
programmer10110 2020-11-23 12:26:10 +03:00
parent 9af79197f3
commit fb278c7cf1
2 changed files with 62 additions and 18 deletions

View File

@ -1,13 +1,16 @@
{
"username": "node_0",
"listen_address": "localhost:8080",
"listen_address": "localhost:8085",
"state_dbdsn": "/tmp/dc4bc_node_0_state/",
"storage_dbdsn": "94.130.57.249:9092",
"storage_dbdsn": "94.130.57.249:9093",
"storage_topic": "test_topic",
"key_store_dbdsn": "/tmp/dc4bc_node_0_key_store",
"frames_delay": 10,
"chunk_size": 512,
"producer_credentials": "producer:producerpass",
"consumer_credentials": "consumer:consumerpass",
"kafka_truststore_path": "../../kafka-docker/certs/ca.crt"
"kafka_truststore_path": "../../kafka-docker/certs/ca.crt",
"chain_id": "bulletin",
"mnemonic": "",
"storage_type": "kafka"
}

View File

@ -32,6 +32,9 @@ const (
flagFramesDelay = "frames_delay"
flagChunkSize = "chunk_size"
flagConfigPath = "config_path"
flagCosmosAccountMnemonic = "mnemonic"
flagCosmosChainID = "chain_id"
flagStorageType = "storage_type"
)
func init() {
@ -39,13 +42,16 @@ func init() {
rootCmd.PersistentFlags().String(flagListenAddr, "localhost:8080", "Listen Address")
rootCmd.PersistentFlags().String(flagStateDBDSN, "./dc4bc_client_state", "State DBDSN")
rootCmd.PersistentFlags().String(flagStorageDBDSN, "./dc4bc_file_storage", "Storage DBDSN")
rootCmd.PersistentFlags().String(flagStorageTopic, "messages", "Storage Topic (Kafka)")
rootCmd.PersistentFlags().String(flagStorageTopic, "messages", "Storage Topic")
rootCmd.PersistentFlags().String(flagKafkaProducerCredentials, "producer:producerpass", "Producer credentials for Kafka: username:password")
rootCmd.PersistentFlags().String(flagKafkaConsumerCredentials, "consumer:consumerpass", "Consumer credentials for Kafka: username:password")
rootCmd.PersistentFlags().String(flagKafkaTrustStorePath, "certs/ca.pem", "Path to kafka truststore")
rootCmd.PersistentFlags().String(flagStoreDBDSN, "./dc4bc_key_store", "Key Store DBDSN")
rootCmd.PersistentFlags().Int(flagFramesDelay, 10, "Delay times between frames in 100ths of a second")
rootCmd.PersistentFlags().Int(flagChunkSize, 256, "QR-code's chunk size")
rootCmd.PersistentFlags().String(flagCosmosAccountMnemonic, "", "Mnemonic of your cosmos-app account")
rootCmd.PersistentFlags().String(flagCosmosChainID, "bulletin", "Chain ID of your tendermint based storage")
rootCmd.PersistentFlags().String(flagStorageType, "kafka", "Type of the storage to connect (kafka or bulletin)")
rootCmd.PersistentFlags().String(flagConfigPath, "", "Path to a config file")
}
@ -61,6 +67,9 @@ type config struct {
ProducerCredentials string `json:"producer_credentials"`
ConsumerCredentials string `json:"consumer_credentials"`
KafkaTrustStorePath string `json:"kafka_truststore_path"`
ChainID string `json:"chain_id"`
AccountMnemonic string `json:"mnemonic"`
StorageType string `json:"storage_type"`
}
func readConfig(path string) (config, error) {
@ -145,6 +154,21 @@ func loadConfig(cmd *cobra.Command) (*config, error) {
return nil, fmt.Errorf("failed to read configuration: %v", err)
}
cfg.AccountMnemonic, err = cmd.Flags().GetString(flagCosmosAccountMnemonic)
if err != nil {
return nil, fmt.Errorf("failed to read configuration: %v", err)
}
cfg.ChainID, err = cmd.Flags().GetString(flagCosmosChainID)
if err != nil {
return nil, fmt.Errorf("failed to read configuration: %v", err)
}
cfg.StorageType, err = cmd.Flags().GetString(flagStorageType)
if err != nil {
return nil, fmt.Errorf("failed to read configuration: %v", err)
}
cfg.KafkaTrustStorePath, err = cmd.Flags().GetString(flagKafkaTrustStorePath)
if err != nil {
log.Fatalf("failed to read configuration: %v", err)
@ -198,6 +222,36 @@ func parseKafkaAuthCredentials(creds string) (*storage.KafkaAuthCredentials, err
}, nil
}
func getStorage(ctx context.Context, cfg *config) (storage.Storage, error) {
switch cfg.StorageType {
case "kafka":
tlsConfig, err := storage.GetTLSConfig(cfg.KafkaTrustStorePath)
if err != nil {
return nil, fmt.Errorf("failed to create tls config: %w", err)
}
producerCreds, err := parseKafkaAuthCredentials(cfg.ProducerCredentials)
if err != nil {
return nil, fmt.Errorf("failed to parse kafka credentials: %w", err)
}
consumerCreds, err := parseKafkaAuthCredentials(cfg.ConsumerCredentials)
if err != nil {
return nil, fmt.Errorf("failed to parse kafka credentials: %w", err)
}
stg, err := storage.NewKafkaStorage(ctx, cfg.StorageDBDSN, cfg.StorageTopic, tlsConfig, producerCreds, consumerCreds)
if err != nil {
return nil, fmt.Errorf("failed to init storage client: %w", err)
}
return stg, nil
case "bulletin":
stg, err := storage.NewTendermintStorage(cfg.StorageDBDSN, cfg.Username, cfg.ChainID, cfg.StorageTopic, cfg.AccountMnemonic)
if err != nil {
return nil, fmt.Errorf("failed to init storage client: %w", err)
}
return stg, nil
}
return nil, fmt.Errorf("unknown storage type")
}
func startClientCommand() *cobra.Command {
return &cobra.Command{
Use: "start",
@ -216,20 +270,7 @@ func startClientCommand() *cobra.Command {
log.Fatalf("Failed to init state client: %v", err)
}
tlsConfig, err := storage.GetTLSConfig(cfg.KafkaTrustStorePath)
if err != nil {
log.Fatalf("faile to create tls config: %v", err)
}
producerCreds, err := parseKafkaAuthCredentials(cfg.ProducerCredentials)
if err != nil {
log.Fatal(err.Error())
}
consumerCreds, err := parseKafkaAuthCredentials(cfg.ProducerCredentials)
if err != nil {
log.Fatal(err.Error())
}
stg, err := storage.NewKafkaStorage(ctx, cfg.StorageDBDSN, cfg.StorageTopic, tlsConfig, producerCreds, consumerCreds)
stg, err := getStorage(ctx, cfg)
if err != nil {
log.Fatalf("Failed to init storage client: %v", err)
}