diff --git a/kafka-docker/.env b/kafka-docker/.env new file mode 100644 index 0000000..0d883ad --- /dev/null +++ b/kafka-docker/.env @@ -0,0 +1 @@ +KEYSTORE_PASSWORD=test1234 \ No newline at end of file diff --git a/kafka-docker/client.cnf b/kafka-docker/client.cnf deleted file mode 100644 index 2e278a8..0000000 --- a/kafka-docker/client.cnf +++ /dev/null @@ -1,29 +0,0 @@ -[req] -prompt = no -distinguished_name = dn -default_md = sha256 -default_bits = 4096 -req_extensions = v3_req - -[ dn ] -countryName = UK -organizationName = Confluent -localityName = London -commonName=kafka.confluent.local - -[ v3_ca ] -subjectKeyIdentifier=hash -basicConstraints = critical,CA:true -authorityKeyIdentifier=keyid:always,issuer:always -keyUsage = critical,keyCertSign,cRLSign - -[ v3_req ] -subjectKeyIdentifier = hash -basicConstraints = CA:FALSE -nsComment = "OpenSSL Generated Certificate" -keyUsage = critical, digitalSignature, keyEncipherment -extendedKeyUsage = clientAuth -subjectAltName = @alt_names - -[ alt_names ] -DNS.1=kafka.confluent.local diff --git a/kafka-docker/docker-compose.yml b/kafka-docker/docker-compose.yml index a3810f5..5a6612e 100644 --- a/kafka-docker/docker-compose.yml +++ b/kafka-docker/docker-compose.yml @@ -1,58 +1,58 @@ -version: '3' +version: '3.4' services: + zookeeper: - build: zookeeper/ + image: confluentinc/cp-zookeeper:5.3.1 container_name: zookeeper hostname: zookeeper - domainname: confluent.local - restart: on-failure - volumes: - - ./certs/:/var/lib/secret - networks: - default: - aliases: - - zookeeper.confluent.local - + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 kafka: - build: kafka/ + image: confluentinc/cp-kafka:5.3.1 container_name: kafka hostname: kafka domainname: confluent.local - depends_on: - - zookeeper - restart: on-failure - volumes: - - ./certs/:/var/lib/secret networks: default: aliases: - kafka.confluent.local + depends_on: + - zookeeper ports: - - "9092:9092" + - 9093:9093 environment: - SCHEMA_REGISTRY_OPTS: '-Djavax.net.ssl.keyStore=/var/lib/secret/client.keystore.jks -Djavax.net.ssl.trustStore=/var/lib/secret/truststore.jks -Djavax.net.ssl.keyStorePassword=test1234 -Djavax.net.ssl.trustStorePassword=test1234' + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' + KAFKA_LISTENER: INTERNAL://kafka.confluent.local:9092,OUTSIDE://localhost:9093 + KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka.confluent.local:9092,OUTSIDE://localhost:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:SASL_SSL,OUTSIDE:SASL_SSL + KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN + KAFKA_LISTENER_NAME_INTERNAL_SASL_ENABLED_MECHANISMS: PLAIN + KAFKA_LISTENER_NAME_OUTSIDE_SASL_ENABLED_MECHANISMS: PLAIN - schema-registry: - build: schema-registry/ - container_name: schema-registry - hostname: schema-registry - domainname: confluent.local - depends_on: - - kafka - restart: on-failure + KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS" + KAFKA_SSL_CLIENT_AUTH: requested + KAFKA_SSL_KEYSTORE_LOCATION: /var/lib/secret/server.keystore.jks + KAFKA_SSL_KEYSTORE_PASSWORD: ${KEYSTORE_PASSWORD} + KAFKA_SSL_TRUSTSTORE_LOCATION: /var/lib/secret/truststore.jks + KAFKA_SSL_TRUSTSTORE_PASSWORD: ${KEYSTORE_PASSWORD} + + KAFKA_LISTENER_NAME_INTERNAL_PLAIN_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required \ + username="admin" \ + password="admin-secret" \ + user_admin="admin-secret" ; + KAFKA_LISTENER_NAME_OUTSIDE_PLAIN_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required \ + username="admin" \ + password="admin-secret" \ + user_admin="admin-secret" \ + user_producer="producerpass" \ + user_consumer="consumerpass" ; + KAFKA_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required \ + username="admin" \ + password="admin-secret" ; + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 volumes: - - ./certs/:/var/lib/secret - - ./schema-registry/schema-registry.properties:/etc/schema-registry/schema-registry.properties - networks: - default: - aliases: - - schema-registry.confluent.local - ports: - - "8443:8443" - -volumes: - secret: {} - -networks: - default: + - ./certs/:/var/lib/secret \ No newline at end of file diff --git a/kafka-docker/kafka/Dockerfile b/kafka-docker/kafka/Dockerfile deleted file mode 100644 index e9121b5..0000000 --- a/kafka-docker/kafka/Dockerfile +++ /dev/null @@ -1,26 +0,0 @@ -FROM centos:centos7 -ENV container docker - -# 1. Adding Confluent repository -RUN rpm --import https://packages.confluent.io/rpm/5.4/archive.key -COPY confluent.repo /etc/yum.repos.d/confluent.repo -RUN yum clean all - -# 2. Install zookeeper and kafka -RUN yum install -y java-1.8.0-openjdk -RUN yum install -y confluent-platform-2.12 -#schema-registry package is rquiterd to run kafka-avro-console-producer -RUN yum install -y confluent-schema-registry - -# 3. Configure Kafka -COPY server.properties /etc/kafka/server.properties -COPY consumer.properties /etc/kafka/consumer.properties - -# 4. Add kafkacat -COPY kafkacat /usr/local/bin -RUN chmod +x /usr/local/bin/kafkacat -COPY kafkacat.conf /etc/kafka/kafkacat.conf - -EXPOSE 9093 - -CMD kafka-server-start /etc/kafka/server.properties diff --git a/kafka-docker/kafka/confluent.repo b/kafka-docker/kafka/confluent.repo deleted file mode 100644 index 4369e53..0000000 --- a/kafka-docker/kafka/confluent.repo +++ /dev/null @@ -1,13 +0,0 @@ -[Confluent.dist] -name=Confluent repository (dist) -baseurl=https://packages.confluent.io/rpm/5.4/7 -gpgcheck=1 -gpgkey=https://packages.confluent.io/rpm/5.4/archive.key -enabled=1 - -[Confluent] -name=Confluent repository -baseurl=https://packages.confluent.io/rpm/5.4 -gpgcheck=1 -gpgkey=https://packages.confluent.io/rpm/5.4/archive.key -enabled=1 diff --git a/kafka-docker/kafka/consumer.properties b/kafka-docker/kafka/consumer.properties deleted file mode 100644 index 98840bc..0000000 --- a/kafka-docker/kafka/consumer.properties +++ /dev/null @@ -1,6 +0,0 @@ -bootstrap.servers=kafka.confluent.local:9093 -security.protocol=SSL -ssl.truststore.location=/var/lib/secret/truststore.jks -ssl.truststore.password=test1234 -ssl.keystore.location=/var/lib/secret/client.keystore.jks -ssl.keystore.password=test1234 diff --git a/kafka-docker/kafka/kafkacat b/kafka-docker/kafka/kafkacat deleted file mode 100644 index 03e40e7..0000000 Binary files a/kafka-docker/kafka/kafkacat and /dev/null differ diff --git a/kafka-docker/kafka/kafkacat.conf b/kafka-docker/kafka/kafkacat.conf deleted file mode 100644 index 81a87f2..0000000 --- a/kafka-docker/kafka/kafkacat.conf +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol=SSL -ssl.key.location=/var/lib/secret/client.pem -ssl.key.password=test1234 -ssl.certificate.location=/var/lib/secret/client.pem -ssl.ca.location=/var/lib/secret/ca.pem diff --git a/kafka-docker/kafka/server.properties b/kafka-docker/kafka/server.properties deleted file mode 100644 index 941964c..0000000 --- a/kafka-docker/kafka/server.properties +++ /dev/null @@ -1,18 +0,0 @@ -broker.id=0 -listeners=INSIDE://kafka.confluent.local:9093,OUTSIDE://kafka.confluent.local:9092 -advertised.listeners=INSIDE://kafka.confluent.local:9093,OUTSIDE://localhost:9092 -log.dirs=/var/lib/kafka -offsets.topic.replication.factor=1 -transaction.state.log.replication.factor=1 -zookeeper.connect=zookeeper.confluent.local:2181 - -# TLS Configuration -inter.broker.listener.name=INSIDE -listener.security.protocol.map=INSIDE:SSL,OUTSIDE:SSL -ssl.truststore.location=/var/lib/secret/truststore.jks -ssl.truststore.password=test1234 -ssl.keystore.location=/var/lib/secret/server.keystore.jks -ssl.keystore.password=test1234 -ssl.client.auth=required -authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer -super.users=User:CN=kafka.confluent.local,L=London,O=Confluent,C=UK;User:CN=schema-registry.confluent.local,L=London,O=Confluent,C=UK diff --git a/kafka-docker/kafkacat.conf b/kafka-docker/kafkacat.conf deleted file mode 100644 index 699de2f..0000000 --- a/kafka-docker/kafkacat.conf +++ /dev/null @@ -1,5 +0,0 @@ -security.protocol=SSL -ssl.key.location=certs/client.pem -ssl.key.password=test1234 -ssl.certificate.location=certs/client.pem -ssl.ca.location=certs/ca.pem diff --git a/kafka-docker/local-client.cnf b/kafka-docker/local-client.cnf deleted file mode 100644 index 9059a83..0000000 --- a/kafka-docker/local-client.cnf +++ /dev/null @@ -1,29 +0,0 @@ -[req] -prompt = no -distinguished_name = dn -default_md = sha256 -default_bits = 4096 -req_extensions = v3_req - -[ dn ] -countryName = UK -organizationName = Confluent -localityName = London -commonName=MacBook-Pro-Mihail-2.local - -[ v3_ca ] -subjectKeyIdentifier=hash -basicConstraints = critical,CA:true -authorityKeyIdentifier=keyid:always,issuer:always -keyUsage = critical,keyCertSign,cRLSign - -[ v3_req ] -subjectKeyIdentifier = hash -basicConstraints = CA:FALSE -nsComment = "OpenSSL Generated Certificate" -keyUsage = critical, digitalSignature, keyEncipherment -extendedKeyUsage = clientAuth -subjectAltName = @alt_names - -[ alt_names ] -DNS.1=MacBook-Pro-Mihail-2.local diff --git a/kafka-docker/schema-registry-client.cnf b/kafka-docker/schema-registry-client.cnf deleted file mode 100644 index c88f33d..0000000 --- a/kafka-docker/schema-registry-client.cnf +++ /dev/null @@ -1,29 +0,0 @@ -[req] -prompt = no -distinguished_name = dn -default_md = sha256 -default_bits = 4096 -req_extensions = v3_req - -[ dn ] -countryName = UK -organizationName = Confluent -localityName = London -commonName=schema-registry.confluent.local - -[ v3_ca ] -subjectKeyIdentifier=hash -basicConstraints = critical,CA:true -authorityKeyIdentifier=keyid:always,issuer:always -keyUsage = critical,keyCertSign,cRLSign - -[ v3_req ] -subjectKeyIdentifier = hash -basicConstraints = CA:FALSE -nsComment = "OpenSSL Generated Certificate" -keyUsage = critical, digitalSignature, keyEncipherment -extendedKeyUsage = clientAuth, serverAuth -subjectAltName = @alt_names - -[ alt_names ] -DNS.1=schema-registry.confluent.local diff --git a/kafka-docker/schema-registry/Dockerfile b/kafka-docker/schema-registry/Dockerfile deleted file mode 100644 index 05603f8..0000000 --- a/kafka-docker/schema-registry/Dockerfile +++ /dev/null @@ -1,18 +0,0 @@ -FROM centos:centos7 -ENV container docker - -# 1. Adding Confluent repository -RUN rpm --import https://packages.confluent.io/rpm/5.4/archive.key -COPY confluent.repo /etc/yum.repos.d/confluent.repo -RUN yum clean all - -# 2. Install zookeeper and kafka -RUN yum install -y java-1.8.0-openjdk -RUN yum install -y confluent-schema-registry confluent-security - -# 3. Configure Kafka -COPY schema-registry.properties /etc/schema-registry/schema-registry.properties - -EXPOSE 8443 - -CMD schema-registry-start /etc/schema-registry/schema-registry.properties diff --git a/kafka-docker/schema-registry/confluent.repo b/kafka-docker/schema-registry/confluent.repo deleted file mode 100644 index 4369e53..0000000 --- a/kafka-docker/schema-registry/confluent.repo +++ /dev/null @@ -1,13 +0,0 @@ -[Confluent.dist] -name=Confluent repository (dist) -baseurl=https://packages.confluent.io/rpm/5.4/7 -gpgcheck=1 -gpgkey=https://packages.confluent.io/rpm/5.4/archive.key -enabled=1 - -[Confluent] -name=Confluent repository -baseurl=https://packages.confluent.io/rpm/5.4 -gpgcheck=1 -gpgkey=https://packages.confluent.io/rpm/5.4/archive.key -enabled=1 diff --git a/kafka-docker/schema-registry/schema-registry.properties b/kafka-docker/schema-registry/schema-registry.properties deleted file mode 100644 index da997cc..0000000 --- a/kafka-docker/schema-registry/schema-registry.properties +++ /dev/null @@ -1,20 +0,0 @@ -listeners=https://schema-registry.confluent.local:8443 -inter.instance.protocol=https -ssl.keystore.location=/var/lib/secret/schema-registry-client.keystore.jks -ssl.keystore.password=test1234 -ssl.key.password=test1234 -kafkastore.topic=_schemas -debug=false - -#SSL settings for communication with Kafka Broker -kafkastore.bootstrap.servers=SSL://kafka.confluent.local:9093 -kafkastore.security.protocol=SSL - -#SSL trust store to verify cert presented by the broker -kafkastore.ssl.truststore.location=/var/lib/secret/truststore.jks -kafkastore.ssl.truststore.password=test1234 - -#SSL key store to provide a cert for the broker -kafkastore.ssl.keystore.location=/var/lib/secret/schema-registry-client.keystore.jks -kafkastore.ssl.keystore.password=test1234 -kafkastore.ssl.key.password=test1234 diff --git a/kafka-docker/up b/kafka-docker/up deleted file mode 100755 index e2a971b..0000000 --- a/kafka-docker/up +++ /dev/null @@ -1,77 +0,0 @@ -#!/bin/sh -set -e - -# Starting kerberos, -# Avoiding starting up all services at the begining to generate the keytab first - - -# Creating TLS CA, Certificates and keystore / truststore -rm -rf certs -mkdir -p certs -# Generate CA certificates -openssl req -new -nodes -x509 -days 3650 -newkey rsa:2048 -keyout certs/ca.key -out certs/ca.crt -config ca.cnf -cat certs/ca.crt certs/ca.key > certs/ca.pem - -# Generate kafka server certificates -openssl req -new -newkey rsa:2048 -keyout certs/server.key -out certs/server.csr -config server.cnf -nodes -openssl x509 -req -days 3650 -in certs/server.csr -CA certs/ca.crt -CAkey certs/ca.key -CAcreateserial -out certs/server.crt -extfile server.cnf -extensions v3_req -openssl pkcs12 -export -in certs/server.crt -inkey certs/server.key -chain -CAfile certs/ca.pem -name "kafka.confluent.local" -out certs/server.p12 -password pass:test1234 - -# Generate client certificates -openssl req -new -newkey rsa:2048 -keyout certs/client.key -out certs/client.csr -config client.cnf -nodes -openssl x509 -req -days 3650 -in certs/client.csr -CA certs/ca.crt -CAkey certs/ca.key -CAcreateserial -out certs/client.crt -extfile client.cnf -extensions v3_req -openssl pkcs12 -export -in certs/client.crt -inkey certs/client.key -chain -CAfile certs/ca.pem -name "kafka.confluent.local" -out certs/client.p12 -password pass:test1234 - -# Generate schema registry client certificate -openssl req -new -newkey rsa:2048 -keyout certs/schema-registry-client.key -out certs/schema-registry-client.csr -config schema-registry-client.cnf -nodes -openssl x509 -req -days 3650 -in certs/schema-registry-client.csr -CA certs/ca.crt -CAkey certs/ca.key -CAcreateserial -out certs/schema-registry-client.crt -extfile schema-registry-client.cnf -extensions v3_req -openssl pkcs12 -export -in certs/schema-registry-client.crt -inkey certs/schema-registry-client.key -chain -CAfile certs/ca.pem -name "schema-registry.confluent.local" -out certs/schema-registry-client.p12 -password pass:test1234 - -# Generate local client certificate -openssl req -new -newkey rsa:2048 -keyout certs/local-client.key -out certs/local-client.csr -config local-client.cnf -nodes -openssl x509 -req -days 3650 -in certs/local-client.csr -CA certs/ca.crt -CAkey certs/ca.key -CAcreateserial -out certs/local-client.crt -extfile local-client.cnf -extensions v3_req -openssl pkcs12 -export -in certs/local-client.crt -inkey certs/local-client.key -chain -CAfile certs/ca.pem -name `hostname` -out certs/local-client.p12 -password pass:test1234 - - -# Import server certificate to keystore and CA to truststore -keytool -importkeystore -deststorepass test1234 -destkeystore certs/server.keystore.jks \ - -srckeystore certs/server.p12 \ - -deststoretype PKCS12 \ - -srcstoretype PKCS12 \ - -noprompt \ - -srcstorepass test1234 - -keytool -importkeystore -deststorepass test1234 -destkeystore certs/client.keystore.jks \ - -srckeystore certs/client.p12 \ - -deststoretype PKCS12 \ - -srcstoretype PKCS12 \ - -noprompt \ - -srcstorepass test1234 - -keytool -importkeystore -deststorepass test1234 -destkeystore certs/schema-registry-client.keystore.jks \ - -srckeystore certs/schema-registry-client.p12 \ - -deststoretype PKCS12 \ - -srcstoretype PKCS12 \ - -noprompt \ - -srcstorepass test1234 - -keytool -importkeystore -deststorepass test1234 -destkeystore certs/local-client.keystore.jks \ - -srckeystore certs/local-client.p12 \ - -deststoretype PKCS12 \ - -srcstoretype PKCS12 \ - -noprompt \ - -srcstorepass test1234 - -keytool -keystore certs/truststore.jks -alias CARoot -import -file certs/ca.crt -storepass test1234 -noprompt -storetype PKCS12 - -# generate client PEM file for kafkacat - -openssl pkcs12 -in certs/client.p12 -out certs/client.pem -passin pass:test1234 -passout pass:test1234 - -# Starting docker-compose services -docker-compose up -d --build - -echo "Example configuration to access kafka:" -echo "-> docker-compose exec kafka kafka-console-producer --broker-list kafka.confluent.local:9093 --topic test --producer.config /etc/kafka/consumer.properties" -echo "-> docker-compose exec kafka kafka-console-consumer --bootstrap-server kafka.confluent.local:9093 --topic test --consumer.config /etc/kafka/consumer.properties --from-beginning" -echo "-> docker-compose exec kafka kafkacat -L -b kafka.confluent.local:9093 -F /etc/kafka/kafkacat.conf -C -t test" diff --git a/kafka-docker/up.sh b/kafka-docker/up.sh new file mode 100755 index 0000000..ae41bd4 --- /dev/null +++ b/kafka-docker/up.sh @@ -0,0 +1,29 @@ +#!/bin/sh +set -e + +PASSWORD=test1234 + +# Creating TLS CA, Certificates and keystore / truststore +rm -rf certs +mkdir -p certs +# Generate CA certificates +openssl req -new -nodes -x509 -days 3650 -newkey rsa:2048 -keyout certs/ca.key -out certs/ca.crt -config ca.cnf +cat certs/ca.crt certs/ca.key > certs/ca.pem + +# Generate kafka server certificates +openssl req -new -newkey rsa:2048 -keyout certs/server.key -out certs/server.csr -config server.cnf -nodes +openssl x509 -req -days 3650 -in certs/server.csr -CA certs/ca.crt -CAkey certs/ca.key -CAcreateserial -out certs/server.crt -extfile server.cnf -extensions v3_req +openssl pkcs12 -export -in certs/server.crt -inkey certs/server.key -chain -CAfile certs/ca.pem -name "kafka.confluent.local" -out certs/server.p12 -password pass:$PASSWORD + +# Import server certificate to keystore and CA to truststore +keytool -importkeystore -deststorepass $PASSWORD -destkeystore certs/server.keystore.jks \ + -srckeystore certs/server.p12 \ + -deststoretype PKCS12 \ + -srcstoretype PKCS12 \ + -noprompt \ + -srcstorepass $PASSWORD + +keytool -keystore certs/truststore.jks -alias CARoot -import -file certs/ca.crt -storepass $PASSWORD -noprompt -storetype PKCS12 + +# Starting docker-compose services +docker-compose up -d --build \ No newline at end of file diff --git a/kafka-docker/zookeeper/Dockerfile b/kafka-docker/zookeeper/Dockerfile deleted file mode 100644 index f58b175..0000000 --- a/kafka-docker/zookeeper/Dockerfile +++ /dev/null @@ -1,18 +0,0 @@ -FROM centos:centos7 -ENV container docker - -# 1. Adding Confluent repository -RUN rpm --import https://packages.confluent.io/rpm/5.4/archive.key -COPY confluent.repo /etc/yum.repos.d/confluent.repo -RUN yum clean all - -# 2. Install zookeeper and kafka -RUN yum install -y java-1.8.0-openjdk -RUN yum install -y confluent-platform-2.12 - -# 3. Configure zookeeper -COPY zookeeper.properties /etc/kafka/zookeeper.properties - -EXPOSE 2181 - -CMD zookeeper-server-start /etc/kafka/zookeeper.properties diff --git a/kafka-docker/zookeeper/confluent.repo b/kafka-docker/zookeeper/confluent.repo deleted file mode 100644 index 4369e53..0000000 --- a/kafka-docker/zookeeper/confluent.repo +++ /dev/null @@ -1,13 +0,0 @@ -[Confluent.dist] -name=Confluent repository (dist) -baseurl=https://packages.confluent.io/rpm/5.4/7 -gpgcheck=1 -gpgkey=https://packages.confluent.io/rpm/5.4/archive.key -enabled=1 - -[Confluent] -name=Confluent repository -baseurl=https://packages.confluent.io/rpm/5.4 -gpgcheck=1 -gpgkey=https://packages.confluent.io/rpm/5.4/archive.key -enabled=1 diff --git a/kafka-docker/zookeeper/zookeeper.properties b/kafka-docker/zookeeper/zookeeper.properties deleted file mode 100644 index f9c1c59..0000000 --- a/kafka-docker/zookeeper/zookeeper.properties +++ /dev/null @@ -1,3 +0,0 @@ -dataDir=/var/lib/zookeeper -clientPort=2181 -maxClientCnxns=0 diff --git a/storage/kafkaStorage.go b/storage/kafkaStorage.go index 27e70fe..04217af 100644 --- a/storage/kafkaStorage.go +++ b/storage/kafkaStorage.go @@ -5,9 +5,8 @@ import ( "crypto/tls" "crypto/x509" "encoding/json" - "encoding/pem" "fmt" - "golang.org/x/crypto/pkcs12" + "github.com/segmentio/kafka-go/sasl/plain" "io/ioutil" "time" @@ -28,29 +27,7 @@ type KafkaAuthCredentials struct { Password string } -func GetTLSConfig(keyStorePath, trustStorePath, password string) (*tls.Config, error) { - - // Keystore - keys, err := ioutil.ReadFile(keyStorePath) - if err != nil { - return nil, fmt.Errorf("failed to read keyStorePath: %w", err) - } - blocks, err := pkcs12.ToPEM(keys, password) - if err != nil { - return nil, fmt.Errorf("failed to convert PKCS12 key to PEM: %w", err) - } - - var pemData []byte - for test, b := range blocks { - _ = test - pemData = append(pemData, pem.EncodeToMemory(b)...) - } - - cert, err := tls.X509KeyPair(pemData, pemData) - if err != nil { - return nil, fmt.Errorf("failed to get X509KeyPair: %w", err) - } - //Truststore +func GetTLSConfig(trustStorePath string) (*tls.Config, error) { caCert, err := ioutil.ReadFile(trustStorePath) if err != nil { return nil, fmt.Errorf("failed to read trustStorePath: %w", err) @@ -60,25 +37,29 @@ func GetTLSConfig(keyStorePath, trustStorePath, password string) (*tls.Config, e caCertPool.AppendCertsFromPEM(caCert) config := &tls.Config{ - Certificates: []tls.Certificate{cert}, RootCAs: caCertPool, InsecureSkipVerify: true, } return config, nil } -func NewKafkaStorage(ctx context.Context, kafkaEndpoint string, kafkaTopic string, tlsConfig *tls.Config) (Storage, error) { +func NewKafkaStorage(ctx context.Context, kafkaEndpoint string, kafkaTopic string, tlsConfig *tls.Config, producerCreds, consumerCreds *KafkaAuthCredentials) (Storage, error) { + mechanismProducer := plain.Mechanism{producerCreds.Username, producerCreds.Password} + mechanismConsumer := plain.Mechanism{consumerCreds.Username, consumerCreds.Password} dialerProducer := &kafka.Dialer{ - Timeout: 10 * time.Second, - DualStack: true, - TLS: tlsConfig, + Timeout: 10 * time.Second, + DualStack: true, + TLS: tlsConfig, + SASLMechanism: mechanismProducer, } dialerConsumer := &kafka.Dialer{ - Timeout: 10 * time.Second, - DualStack: true, - TLS: tlsConfig, + Timeout: 10 * time.Second, + DualStack: true, + TLS: tlsConfig, + SASLMechanism: mechanismConsumer, } + conn, err := dialerProducer.DialLeader(ctx, "tcp", kafkaEndpoint, kafkaTopic, kafkaPartition) if err != nil { return nil, fmt.Errorf("failed to init Kafka client: %w", err) diff --git a/storage/kafkaStorage_test.go b/storage/kafkaStorage_test.go index 68c0ee8..ff168dc 100644 --- a/storage/kafkaStorage_test.go +++ b/storage/kafkaStorage_test.go @@ -16,13 +16,22 @@ func TestKafkaStorage_GetMessages(t *testing.T) { N := 10 var offset uint64 = 5 - tlsConfig, err := GetTLSConfig("../kafka-docker/certs/local-client.p12", "../kafka-docker/certs/ca.pem", "test1234") + producerCreds := &KafkaAuthCredentials{ + Username: "producer", + Password: "producerpass", + } + consumerCreds := &KafkaAuthCredentials{ + Username: "consumer", + Password: "consumerpass", + } + + tlsConfig, err := GetTLSConfig("../kafka-docker/certs/ca.crt") if err != nil { t.Fatal(err.Error()) } req := require.New(t) - stg, err := NewKafkaStorage(context.Background(), "localhost:9092", "test", tlsConfig) + stg, err := NewKafkaStorage(context.Background(), "localhost:9093", "test", tlsConfig, producerCreds, consumerCreds) req.NoError(err) msgs := make([]Message, 0, N)