Kafka is an open source tool that is a distributed streaming platform mainly used for consuming and producing records in real-time (similar to a messaging system) while being fault tolerant when configured in a cluster.

More info can be found at Kafka

This guide will mainly show you how you can configure Kafka with SSL and also utilize its built-in Authorizer to restrict and give access depending on topic, user and host IP using a docker container (as shown in the featured blog image).


Service configuration

Will be using a CentOS docker image for this guide. Will go through the necessary kafka installation and configurations as we go through the guide

  • Run the following commands wherever you have docker installed
    docker run -itd -p 9093:9093 -h kafka.mintopsblog.local centos:latest bash
  • Once run, this should pull the latest centos image from the public repositories and echo back a container id.
  • You can then access the container by doing the following
    docker ps | grep -i centos:latest (to get the container id that was created with the centos:latest image)
    docker exec -it *your-container-id* bash

docker1

  • When inside the container we would then need to get a clean installation of Kafka so that we can then configure it with SSL and ACLs.
  • We’ll start by creating a kafka directory where the installation will be
    mkdir -p /opt/kafka
    cd /opt/kafka
  • Downloading the latest Apache Kafka version 1.1.0 with Scala version 2.11
    yum -y install wget
    wget http://www-eu.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
    tar xvf kafka_2.11-1.1.0.tgz
    cd kafka_2.11-1.1.0/config
  • We will now configure the server.properties for the Kafka Broker. We will leave most of the file default since we are only configuring 1 test broker
    vi server.properties
  • You will need to find the following configs
    #listeners=PLAINTEXT://:9092
    #advertised_listeners=PLAINTEXT://your.host.name:9092
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    log_dirs=/tmp/kafka-logs
    zookeeper.connect=localhost:2181
  • And change them as follows
    listeners=SSL://:9093
    advertised_listeners=SSL://kafka.mintopsblog.local:9093
    listener.security.protocol.map=SSL:SSL
    log_dirs=/opt/kafka-logs
    zookeeper.connect=localhost:2181
  • The above config is not enough to make Kafka work with SSL, so we would need to add some other custom properties inside server.properties file and later on also create an SSL certificate later on. Add the following at the end of the file
    security.protocol=SSL
    security.inter.broker.protocol=SSL
    ssl.keystore.location=/opt/ssl/kafka.server.keystore.jks
    ssl.keystore.password=sslpassphrase
    ssl.key.password=sslpassphrase
    ssl.keystore.type=JKS
    ssl.truststore.location=/opt/ssl/kafka.server.truststore.jks
    ssl.truststore.password=sslpassphrase
    ssl.truststore.type=JKS
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    allow.everyone.if.no.acl.found=false
    super.users=User:CN=kafka.mintopsblog.local,OU=kafka,O=kafka,L=kafka,ST=kafka,C=XX
    ssl.client.auth=required
  • So all the basic necessary configuration are now done for the Kafka Broker. Now we will proceed in creating the necessary directories and SSL certificates to make this work
    mkdir -p /opt/ssl
    cd /opt/ssl

docker2

  • Install JAVA 1.8 on the docker container since we will make use of the “keytool” command and also “openssl
    yum -y install java-1.8.0-openjdk openssl
  • You can then use the following script to create the necessary server and client certificates/keystores.
  • You would need to manually intervene when asked to provide some information (e.g. Kafka broker, passphrases and also what user you would like to give access to)
    #!/bin/bash
    
    ###Set Environment variable
    VALIDITY=1825
    
    read -p "Please specify the Kafka broker hostname without the port: " KAFKA_HOST
    echo -n "Please specify a passphrase for the server ssl certificate: " 
    read -s SSLPASSPHRASE
    echo -e "\n"
    CERTIFICATE_INFO="CN=$KAFKA_HOST,OU=kafka,O=kafka,L=kafka,ST=kafka,C=XX"
    CA_INFO="/C=XX/ST=kafka/L=kafka/O=kafka/OU=kafka/CN=$KAFKA_HOST/"
    KAFKA_SSL=/opt/ssl
    
    mkdir -p $KAFKA_SSL
    cd $KAFKA_SSL
    
    ###Create CA and server keystore/truststore###
    openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -subj $CA_INFO -passout pass:$SSLPASSPHRASE &> /dev/null
    keytool -noprompt -keystore kafka.server.keystore.jks -alias $KAFKA_HOST -validity $VALIDITY -genkey -dname $CERTIFICATE_INFO -keypass $SSLPASSPHRASE -storepass $SSLPASSPHRASE &> /dev/dull
    keytool -noprompt -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass $SSLPASSPHRASE &> /dev/null
    keytool -noprompt -keystore kafka.server.keystore.jks -alias $KAFKA_HOST -certreq -file cert-file-$KAFKA_HOST -storepass $SSLPASSPHRASE &> /dev/null
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-$KAFKA_HOST -out cert-signed-$KAFKA_HOST -days $VALIDITY -CAcreateserial -passin pass:$SSLPASSPHRASE &> /dev/null
    keytool -noprompt -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass $SSLPASSPHRASE &> /dev/null
    keytool -noprompt -keystore kafka.server.keystore.jks -alias $KAFKA_HOST -import -file cert-signed-$KAFKA_HOST -storepass $SSLPASSPHRASE &> /dev/null
    
    ###Create client keystore and truststore###
    read -p "Please specify a username to give Kafka ACL permissions: " USERNAME
    echo -n "Please specify a passphrase for the client ssl certificate: "
    read -s CLIENT_SSLPASSPHRASE
    echo -e "\n"
    CLIENT_CERTIFICATE_INFO="CN=$USERNAME,OU=kafka,O=kafka,L=kafka,ST=kafka,C=XX"
    
    keytool -noprompt -keystore kafka.client.keystore.jks -alias $USERNAME -validity $VALIDITY -genkey -dname $CLIENT_CERTIFICATE_INFO -keypass $CLIENT_SSLPASSPHRASE -storepass $CLIENT_SSLPASSPHRASE &> /dev/null
    keytool -noprompt -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass $CLIENT_SSLPASSPHRASE &> /dev/null
    keytool -noprompt -keystore kafka.client.keystore.jks -alias $USERNAME -certreq -file cert-file-client-$USERNAME -storepass $CLIENT_SSLPASSPHRASE &> /dev/null
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-client-$USERNAME -out cert-signed-client-$USERNAME -days $VALIDITY -CAcreateserial -passin pass:$SSLPASSPHRASE &>/dev/null
    
    ###Add client certificate to server truststore###
    keytool -keystore kafka.server.truststore.jks -alias $USERNAME -import -file cert-signed-client-$USERNAME -storepass $SSLPASSPHRASE &> /dev/null

docker3

  • Once the certificates and keystores are created, we can start the Kafka Broker and the embedded zookeeper which comes with the installation.
    cd /opt/kafka/kafka_2.11-1.1.0/bin/
  • First you would need to start the zookeeper service
    nohup ./zookeeper-server-start.sh ../config/zookeeper.properties > zookeeper.out &
  • After zookeeper has started, start the kafka broker
    nohup ./kafka-server-start.sh ../config/server.properties > kafka.out &
  • Next, we will create a kafka topic so that we can then configure the ACLs.
    ./kafka-topics.sh --zookeeper localhost:2181 --create --topic mintopsblog --partitions 1 --replication-factor 1
  • Now we will configure the ACLs on this newly created topic for the user that was created in the certificate (in my case the user is “mintopsblog”).
  • Manually (Producer)
docker4
./kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –add –allow-principal User:”CN=mintopsblog,OU=kafka,O=kafka,L=kafka,ST=kafka,C=XX” –producer –topic mintopsblog
  • Manually (Consumer)
docker6
./kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –add –allow-principal User:”CN=mintopsblog,OU=kafka,O=kafka,L=kafka,ST=kafka,C=XX” –consumer –topic mintopsblog –group mintopsblog-consumer
  • Or use the following script if you wish.
    #!/bin/bash
    set -x
    read -p "Please specify the username for the Kafka ACLs permissions: " USERNAME
    CLIENT_CERTIFICATE_INFO="CN=$USERNAME,OU=kafka,O=kafka,L=kafka,ST=kafka,C=XX"
    echo -n "Please specify the passphrase for the client ssl certificate: "
    read -s CLIENT_SSLPASSPHRASE
    echo -e "\n"
    
    read -p "Please specify the kafka /bin directory: " KAFKA_DIRECTORY
    read -p "Please specify a kafka topic for Kafka ACL: " TOPIC
    read -p "Specify zookeeper hosts (host:port): " ZKHOST
    read -p "Please specify permissions (producer or consumer): " PERMISSION
    
    if [[ "$PERMISSION" == "consumer" ]]; 
     then
    
    ###Consumer permissions###
     read -p "Please specify consumer group id: " CONSUMERID
     $KAFKA_DIRECTORY/kafka-acls --authorizer-properties zookeeper.connect=$ZKHOST --add --allow-principal User:$CLIENT_CERTIFICATE_INFO --$PERMISSION --topic $TOPIC --group $CONSUMERID
    
    else
     ###Producer permissions###
     $KAFKA_DIRECTORY/kafka-acls --authorizer-properties zookeeper.connect=$ZKHOST --add --allow-principal User:$CLIENT_CERTIFICATE_INFO --$PERMISSION --topic $TOPIC
    
    fi
    KAFKA_SSL=/opt/ssl
    cat > $KAFKA_SSL/client-ssl.properties << EOF
    security.protocol=SSL
    ssl.truststore.location=$KAFKA_SSL/kafka.client.truststore.jks
    ssl.truststore.password=$CLIENT_SSLPASSPHRASE
    ssl.keystore.location=$KAFKA_SSL/kafka.client.keystore.jks
    ssl.keystore.password=$CLIENT_SSLPASSPHRASE
    ssl.key.password=$CLIENT_SSLPASSPHRASE
    EOF
  • You would need to create another file for the client so that it can use the client-keystore and client-truststore that were previously created. Change the password and necessary configs depending on how you previously set them up
    cat > /opt/ssl/client-ssl.properties << EOF 
    security.protocol=SSL 
    ssl.truststore.location=/opt/ssl/kafka.client.truststore.jks 
    ssl.truststore.password=sslpassphrase1
    ssl.keystore.location=/opt/ssl/kafka.client.keystore.jks 
    ssl.keystore.password=sslpassphrase1 
    ssl.key.password=sslpassphrase1
    EOF
  • You can then check whether you can produce or consume using the following commands
    • Consumer
      ./kafka-console-consumer.sh --bootstrap-server kafka.mintopsblog.local:9093 --topic mintopsblog --consumer.config /opt/ssl/client-ssl.properties --group mintopsblog-consumer --from-beginning
    • Producer
      ./kafka-console-producer.sh --broker-list kafka.mintopsblog.local:9093 --topic mintopsblog --producer-config /opt/ssl/client-ssl.properties

Should you not have ACLs on a specific topic you will get the following alert:

docker5

N.B. For the ACLs to work properly, you would need to create a different certificate for each user you will have. This certificate would then need to be added to the Kafka server truststore and then the Kafka broker would need to be restarted so that the SSL handshake can happen.


Hope this guide helps you out, if you have any difficulties don’t hesitate to post a comment. Also any needed improvements or mistakes done in the guides feel free to point them out.

Advertisements

One comment

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s