plaintext + ssl

This commit is contained in:
programmer10110 2020-10-15 17:16:19 +03:00
parent 884c808e3f
commit 6711717866
22 changed files with 96 additions and 398 deletions

1
kafka-docker/.env Normal file
View File

@ -0,0 +1 @@
KEYSTORE_PASSWORD=test1234

View File

@ -1,29 +0,0 @@
[req]
prompt = no
distinguished_name = dn
default_md = sha256
default_bits = 4096
req_extensions = v3_req
[ dn ]
countryName = UK
organizationName = Confluent
localityName = London
commonName=kafka.confluent.local
[ v3_ca ]
subjectKeyIdentifier=hash
basicConstraints = critical,CA:true
authorityKeyIdentifier=keyid:always,issuer:always
keyUsage = critical,keyCertSign,cRLSign
[ v3_req ]
subjectKeyIdentifier = hash
basicConstraints = CA:FALSE
nsComment = "OpenSSL Generated Certificate"
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth
subjectAltName = @alt_names
[ alt_names ]
DNS.1=kafka.confluent.local

View File

@ -1,58 +1,58 @@
version: '3'
version: '3.4'
services:
zookeeper:
build: zookeeper/
image: confluentinc/cp-zookeeper:5.3.1
container_name: zookeeper
hostname: zookeeper
domainname: confluent.local
restart: on-failure
volumes:
- ./certs/:/var/lib/secret
networks:
default:
aliases:
- zookeeper.confluent.local
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
build: kafka/
image: confluentinc/cp-kafka:5.3.1
container_name: kafka
hostname: kafka
domainname: confluent.local
depends_on:
- zookeeper
restart: on-failure
volumes:
- ./certs/:/var/lib/secret
networks:
default:
aliases:
- kafka.confluent.local
depends_on:
- zookeeper
ports:
- "9092:9092"
- 9093:9093
environment:
SCHEMA_REGISTRY_OPTS: '-Djavax.net.ssl.keyStore=/var/lib/secret/client.keystore.jks -Djavax.net.ssl.trustStore=/var/lib/secret/truststore.jks -Djavax.net.ssl.keyStorePassword=test1234 -Djavax.net.ssl.trustStorePassword=test1234'
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER: INTERNAL://kafka.confluent.local:9092,OUTSIDE://localhost:9093
KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka.confluent.local:9092,OUTSIDE://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:SASL_SSL,OUTSIDE:SASL_SSL
KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_LISTENER_NAME_INTERNAL_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_LISTENER_NAME_OUTSIDE_SASL_ENABLED_MECHANISMS: PLAIN
schema-registry:
build: schema-registry/
container_name: schema-registry
hostname: schema-registry
domainname: confluent.local
depends_on:
- kafka
restart: on-failure
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "HTTPS"
KAFKA_SSL_CLIENT_AUTH: requested
KAFKA_SSL_KEYSTORE_LOCATION: /var/lib/secret/server.keystore.jks
KAFKA_SSL_KEYSTORE_PASSWORD: ${KEYSTORE_PASSWORD}
KAFKA_SSL_TRUSTSTORE_LOCATION: /var/lib/secret/truststore.jks
KAFKA_SSL_TRUSTSTORE_PASSWORD: ${KEYSTORE_PASSWORD}
KAFKA_LISTENER_NAME_INTERNAL_PLAIN_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret" \
user_admin="admin-secret" ;
KAFKA_LISTENER_NAME_OUTSIDE_PLAIN_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret" \
user_admin="admin-secret" \
user_producer="producerpass" \
user_consumer="consumerpass" ;
KAFKA_SASL_JAAS_CONFIG: org.apache.kafka.common.security.plain.PlainLoginModule required \
username="admin" \
password="admin-secret" ;
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
volumes:
- ./certs/:/var/lib/secret
- ./schema-registry/schema-registry.properties:/etc/schema-registry/schema-registry.properties
networks:
default:
aliases:
- schema-registry.confluent.local
ports:
- "8443:8443"
volumes:
secret: {}
networks:
default:
- ./certs/:/var/lib/secret

View File

@ -1,26 +0,0 @@
FROM centos:centos7
ENV container docker
# 1. Adding Confluent repository
RUN rpm --import https://packages.confluent.io/rpm/5.4/archive.key
COPY confluent.repo /etc/yum.repos.d/confluent.repo
RUN yum clean all
# 2. Install zookeeper and kafka
RUN yum install -y java-1.8.0-openjdk
RUN yum install -y confluent-platform-2.12
#schema-registry package is rquiterd to run kafka-avro-console-producer
RUN yum install -y confluent-schema-registry
# 3. Configure Kafka
COPY server.properties /etc/kafka/server.properties
COPY consumer.properties /etc/kafka/consumer.properties
# 4. Add kafkacat
COPY kafkacat /usr/local/bin
RUN chmod +x /usr/local/bin/kafkacat
COPY kafkacat.conf /etc/kafka/kafkacat.conf
EXPOSE 9093
CMD kafka-server-start /etc/kafka/server.properties

View File

@ -1,13 +0,0 @@
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.4/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.4/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.4
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.4/archive.key
enabled=1

View File

@ -1,6 +0,0 @@
bootstrap.servers=kafka.confluent.local:9093
security.protocol=SSL
ssl.truststore.location=/var/lib/secret/truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/lib/secret/client.keystore.jks
ssl.keystore.password=test1234

Binary file not shown.

View File

@ -1,5 +0,0 @@
security.protocol=SSL
ssl.key.location=/var/lib/secret/client.pem
ssl.key.password=test1234
ssl.certificate.location=/var/lib/secret/client.pem
ssl.ca.location=/var/lib/secret/ca.pem

View File

@ -1,18 +0,0 @@
broker.id=0
listeners=INSIDE://kafka.confluent.local:9093,OUTSIDE://kafka.confluent.local:9092
advertised.listeners=INSIDE://kafka.confluent.local:9093,OUTSIDE://localhost:9092
log.dirs=/var/lib/kafka
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
zookeeper.connect=zookeeper.confluent.local:2181
# TLS Configuration
inter.broker.listener.name=INSIDE
listener.security.protocol.map=INSIDE:SSL,OUTSIDE:SSL
ssl.truststore.location=/var/lib/secret/truststore.jks
ssl.truststore.password=test1234
ssl.keystore.location=/var/lib/secret/server.keystore.jks
ssl.keystore.password=test1234
ssl.client.auth=required
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
super.users=User:CN=kafka.confluent.local,L=London,O=Confluent,C=UK;User:CN=schema-registry.confluent.local,L=London,O=Confluent,C=UK

View File

@ -1,5 +0,0 @@
security.protocol=SSL
ssl.key.location=certs/client.pem
ssl.key.password=test1234
ssl.certificate.location=certs/client.pem
ssl.ca.location=certs/ca.pem

View File

@ -1,29 +0,0 @@
[req]
prompt = no
distinguished_name = dn
default_md = sha256
default_bits = 4096
req_extensions = v3_req
[ dn ]
countryName = UK
organizationName = Confluent
localityName = London
commonName=MacBook-Pro-Mihail-2.local
[ v3_ca ]
subjectKeyIdentifier=hash
basicConstraints = critical,CA:true
authorityKeyIdentifier=keyid:always,issuer:always
keyUsage = critical,keyCertSign,cRLSign
[ v3_req ]
subjectKeyIdentifier = hash
basicConstraints = CA:FALSE
nsComment = "OpenSSL Generated Certificate"
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth
subjectAltName = @alt_names
[ alt_names ]
DNS.1=MacBook-Pro-Mihail-2.local

View File

@ -1,29 +0,0 @@
[req]
prompt = no
distinguished_name = dn
default_md = sha256
default_bits = 4096
req_extensions = v3_req
[ dn ]
countryName = UK
organizationName = Confluent
localityName = London
commonName=schema-registry.confluent.local
[ v3_ca ]
subjectKeyIdentifier=hash
basicConstraints = critical,CA:true
authorityKeyIdentifier=keyid:always,issuer:always
keyUsage = critical,keyCertSign,cRLSign
[ v3_req ]
subjectKeyIdentifier = hash
basicConstraints = CA:FALSE
nsComment = "OpenSSL Generated Certificate"
keyUsage = critical, digitalSignature, keyEncipherment
extendedKeyUsage = clientAuth, serverAuth
subjectAltName = @alt_names
[ alt_names ]
DNS.1=schema-registry.confluent.local

View File

@ -1,18 +0,0 @@
FROM centos:centos7
ENV container docker
# 1. Adding Confluent repository
RUN rpm --import https://packages.confluent.io/rpm/5.4/archive.key
COPY confluent.repo /etc/yum.repos.d/confluent.repo
RUN yum clean all
# 2. Install zookeeper and kafka
RUN yum install -y java-1.8.0-openjdk
RUN yum install -y confluent-schema-registry confluent-security
# 3. Configure Kafka
COPY schema-registry.properties /etc/schema-registry/schema-registry.properties
EXPOSE 8443
CMD schema-registry-start /etc/schema-registry/schema-registry.properties

View File

@ -1,13 +0,0 @@
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.4/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.4/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.4
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.4/archive.key
enabled=1

View File

@ -1,20 +0,0 @@
listeners=https://schema-registry.confluent.local:8443
inter.instance.protocol=https
ssl.keystore.location=/var/lib/secret/schema-registry-client.keystore.jks
ssl.keystore.password=test1234
ssl.key.password=test1234
kafkastore.topic=_schemas
debug=false
#SSL settings for communication with Kafka Broker
kafkastore.bootstrap.servers=SSL://kafka.confluent.local:9093
kafkastore.security.protocol=SSL
#SSL trust store to verify cert presented by the broker
kafkastore.ssl.truststore.location=/var/lib/secret/truststore.jks
kafkastore.ssl.truststore.password=test1234
#SSL key store to provide a cert for the broker
kafkastore.ssl.keystore.location=/var/lib/secret/schema-registry-client.keystore.jks
kafkastore.ssl.keystore.password=test1234
kafkastore.ssl.key.password=test1234

View File

@ -1,77 +0,0 @@
#!/bin/sh
set -e
# Starting kerberos,
# Avoiding starting up all services at the begining to generate the keytab first
# Creating TLS CA, Certificates and keystore / truststore
rm -rf certs
mkdir -p certs
# Generate CA certificates
openssl req -new -nodes -x509 -days 3650 -newkey rsa:2048 -keyout certs/ca.key -out certs/ca.crt -config ca.cnf
cat certs/ca.crt certs/ca.key > certs/ca.pem
# Generate kafka server certificates
openssl req -new -newkey rsa:2048 -keyout certs/server.key -out certs/server.csr -config server.cnf -nodes
openssl x509 -req -days 3650 -in certs/server.csr -CA certs/ca.crt -CAkey certs/ca.key -CAcreateserial -out certs/server.crt -extfile server.cnf -extensions v3_req
openssl pkcs12 -export -in certs/server.crt -inkey certs/server.key -chain -CAfile certs/ca.pem -name "kafka.confluent.local" -out certs/server.p12 -password pass:test1234
# Generate client certificates
openssl req -new -newkey rsa:2048 -keyout certs/client.key -out certs/client.csr -config client.cnf -nodes
openssl x509 -req -days 3650 -in certs/client.csr -CA certs/ca.crt -CAkey certs/ca.key -CAcreateserial -out certs/client.crt -extfile client.cnf -extensions v3_req
openssl pkcs12 -export -in certs/client.crt -inkey certs/client.key -chain -CAfile certs/ca.pem -name "kafka.confluent.local" -out certs/client.p12 -password pass:test1234
# Generate schema registry client certificate
openssl req -new -newkey rsa:2048 -keyout certs/schema-registry-client.key -out certs/schema-registry-client.csr -config schema-registry-client.cnf -nodes
openssl x509 -req -days 3650 -in certs/schema-registry-client.csr -CA certs/ca.crt -CAkey certs/ca.key -CAcreateserial -out certs/schema-registry-client.crt -extfile schema-registry-client.cnf -extensions v3_req
openssl pkcs12 -export -in certs/schema-registry-client.crt -inkey certs/schema-registry-client.key -chain -CAfile certs/ca.pem -name "schema-registry.confluent.local" -out certs/schema-registry-client.p12 -password pass:test1234
# Generate local client certificate
openssl req -new -newkey rsa:2048 -keyout certs/local-client.key -out certs/local-client.csr -config local-client.cnf -nodes
openssl x509 -req -days 3650 -in certs/local-client.csr -CA certs/ca.crt -CAkey certs/ca.key -CAcreateserial -out certs/local-client.crt -extfile local-client.cnf -extensions v3_req
openssl pkcs12 -export -in certs/local-client.crt -inkey certs/local-client.key -chain -CAfile certs/ca.pem -name `hostname` -out certs/local-client.p12 -password pass:test1234
# Import server certificate to keystore and CA to truststore
keytool -importkeystore -deststorepass test1234 -destkeystore certs/server.keystore.jks \
-srckeystore certs/server.p12 \
-deststoretype PKCS12 \
-srcstoretype PKCS12 \
-noprompt \
-srcstorepass test1234
keytool -importkeystore -deststorepass test1234 -destkeystore certs/client.keystore.jks \
-srckeystore certs/client.p12 \
-deststoretype PKCS12 \
-srcstoretype PKCS12 \
-noprompt \
-srcstorepass test1234
keytool -importkeystore -deststorepass test1234 -destkeystore certs/schema-registry-client.keystore.jks \
-srckeystore certs/schema-registry-client.p12 \
-deststoretype PKCS12 \
-srcstoretype PKCS12 \
-noprompt \
-srcstorepass test1234
keytool -importkeystore -deststorepass test1234 -destkeystore certs/local-client.keystore.jks \
-srckeystore certs/local-client.p12 \
-deststoretype PKCS12 \
-srcstoretype PKCS12 \
-noprompt \
-srcstorepass test1234
keytool -keystore certs/truststore.jks -alias CARoot -import -file certs/ca.crt -storepass test1234 -noprompt -storetype PKCS12
# generate client PEM file for kafkacat
openssl pkcs12 -in certs/client.p12 -out certs/client.pem -passin pass:test1234 -passout pass:test1234
# Starting docker-compose services
docker-compose up -d --build
echo "Example configuration to access kafka:"
echo "-> docker-compose exec kafka kafka-console-producer --broker-list kafka.confluent.local:9093 --topic test --producer.config /etc/kafka/consumer.properties"
echo "-> docker-compose exec kafka kafka-console-consumer --bootstrap-server kafka.confluent.local:9093 --topic test --consumer.config /etc/kafka/consumer.properties --from-beginning"
echo "-> docker-compose exec kafka kafkacat -L -b kafka.confluent.local:9093 -F /etc/kafka/kafkacat.conf -C -t test"

29
kafka-docker/up.sh Executable file
View File

@ -0,0 +1,29 @@
#!/bin/sh
set -e
PASSWORD=test1234
# Creating TLS CA, Certificates and keystore / truststore
rm -rf certs
mkdir -p certs
# Generate CA certificates
openssl req -new -nodes -x509 -days 3650 -newkey rsa:2048 -keyout certs/ca.key -out certs/ca.crt -config ca.cnf
cat certs/ca.crt certs/ca.key > certs/ca.pem
# Generate kafka server certificates
openssl req -new -newkey rsa:2048 -keyout certs/server.key -out certs/server.csr -config server.cnf -nodes
openssl x509 -req -days 3650 -in certs/server.csr -CA certs/ca.crt -CAkey certs/ca.key -CAcreateserial -out certs/server.crt -extfile server.cnf -extensions v3_req
openssl pkcs12 -export -in certs/server.crt -inkey certs/server.key -chain -CAfile certs/ca.pem -name "kafka.confluent.local" -out certs/server.p12 -password pass:$PASSWORD
# Import server certificate to keystore and CA to truststore
keytool -importkeystore -deststorepass $PASSWORD -destkeystore certs/server.keystore.jks \
-srckeystore certs/server.p12 \
-deststoretype PKCS12 \
-srcstoretype PKCS12 \
-noprompt \
-srcstorepass $PASSWORD
keytool -keystore certs/truststore.jks -alias CARoot -import -file certs/ca.crt -storepass $PASSWORD -noprompt -storetype PKCS12
# Starting docker-compose services
docker-compose up -d --build

View File

@ -1,18 +0,0 @@
FROM centos:centos7
ENV container docker
# 1. Adding Confluent repository
RUN rpm --import https://packages.confluent.io/rpm/5.4/archive.key
COPY confluent.repo /etc/yum.repos.d/confluent.repo
RUN yum clean all
# 2. Install zookeeper and kafka
RUN yum install -y java-1.8.0-openjdk
RUN yum install -y confluent-platform-2.12
# 3. Configure zookeeper
COPY zookeeper.properties /etc/kafka/zookeeper.properties
EXPOSE 2181
CMD zookeeper-server-start /etc/kafka/zookeeper.properties

View File

@ -1,13 +0,0 @@
[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.4/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.4/archive.key
enabled=1
[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.4
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.4/archive.key
enabled=1

View File

@ -1,3 +0,0 @@
dataDir=/var/lib/zookeeper
clientPort=2181
maxClientCnxns=0

View File

@ -5,9 +5,8 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"encoding/pem"
"fmt"
"golang.org/x/crypto/pkcs12"
"github.com/segmentio/kafka-go/sasl/plain"
"io/ioutil"
"time"
@ -28,29 +27,7 @@ type KafkaAuthCredentials struct {
Password string
}
func GetTLSConfig(keyStorePath, trustStorePath, password string) (*tls.Config, error) {
// Keystore
keys, err := ioutil.ReadFile(keyStorePath)
if err != nil {
return nil, fmt.Errorf("failed to read keyStorePath: %w", err)
}
blocks, err := pkcs12.ToPEM(keys, password)
if err != nil {
return nil, fmt.Errorf("failed to convert PKCS12 key to PEM: %w", err)
}
var pemData []byte
for test, b := range blocks {
_ = test
pemData = append(pemData, pem.EncodeToMemory(b)...)
}
cert, err := tls.X509KeyPair(pemData, pemData)
if err != nil {
return nil, fmt.Errorf("failed to get X509KeyPair: %w", err)
}
//Truststore
func GetTLSConfig(trustStorePath string) (*tls.Config, error) {
caCert, err := ioutil.ReadFile(trustStorePath)
if err != nil {
return nil, fmt.Errorf("failed to read trustStorePath: %w", err)
@ -60,25 +37,29 @@ func GetTLSConfig(keyStorePath, trustStorePath, password string) (*tls.Config, e
caCertPool.AppendCertsFromPEM(caCert)
config := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: true,
}
return config, nil
}
func NewKafkaStorage(ctx context.Context, kafkaEndpoint string, kafkaTopic string, tlsConfig *tls.Config) (Storage, error) {
func NewKafkaStorage(ctx context.Context, kafkaEndpoint string, kafkaTopic string, tlsConfig *tls.Config, producerCreds, consumerCreds *KafkaAuthCredentials) (Storage, error) {
mechanismProducer := plain.Mechanism{producerCreds.Username, producerCreds.Password}
mechanismConsumer := plain.Mechanism{consumerCreds.Username, consumerCreds.Password}
dialerProducer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: tlsConfig,
Timeout: 10 * time.Second,
DualStack: true,
TLS: tlsConfig,
SASLMechanism: mechanismProducer,
}
dialerConsumer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: tlsConfig,
Timeout: 10 * time.Second,
DualStack: true,
TLS: tlsConfig,
SASLMechanism: mechanismConsumer,
}
conn, err := dialerProducer.DialLeader(ctx, "tcp", kafkaEndpoint, kafkaTopic, kafkaPartition)
if err != nil {
return nil, fmt.Errorf("failed to init Kafka client: %w", err)

View File

@ -16,13 +16,22 @@ func TestKafkaStorage_GetMessages(t *testing.T) {
N := 10
var offset uint64 = 5
tlsConfig, err := GetTLSConfig("../kafka-docker/certs/local-client.p12", "../kafka-docker/certs/ca.pem", "test1234")
producerCreds := &KafkaAuthCredentials{
Username: "producer",
Password: "producerpass",
}
consumerCreds := &KafkaAuthCredentials{
Username: "consumer",
Password: "consumerpass",
}
tlsConfig, err := GetTLSConfig("../kafka-docker/certs/ca.crt")
if err != nil {
t.Fatal(err.Error())
}
req := require.New(t)
stg, err := NewKafkaStorage(context.Background(), "localhost:9092", "test", tlsConfig)
stg, err := NewKafkaStorage(context.Background(), "localhost:9093", "test", tlsConfig, producerCreds, consumerCreds)
req.NoError(err)
msgs := make([]Message, 0, N)