Kafka – Using Authorization/ACL (without Kerberos) with SSL Configuration in a Docker container

Kafka is an open source tool that is a distributed streaming platform mainly used for consuming and producing records in real-time (similar to a messaging system) while being fault tolerant when configured in a cluster.

More info can be found at Kafka

This guide will mainly show you how you can configure Kafka with SSL and also utilize its built-in Authorizer to restrict and give access depending on topic, user and host IP using a docker container (as shown in the featured blog image).


Service configuration

Will be using a CentOS docker image for this guide. Will go through the necessary kafka installation and configurations as we go through the guide

  • Run the following commands wherever you have docker installed
    docker run -itd -p 9093:9093 -h kafka.mintopsblog.local centos:latest bash
  • Once run, this should pull the latest centos image from the public repositories and echo back a container id.
  • You can then access the container by doing the following
    docker ps | grep -i centos:latest (to get the container id that was created with the centos:latest image)
    docker exec -it *your-container-id* bash

docker1

  • When inside the container we would then need to get a clean installation of Kafka so that we can then configure it with SSL and ACLs.
  • We’ll start by creating a kafka directory where the installation will be
    mkdir -p /opt/kafka
    cd /opt/kafka
  • Downloading the latest Apache Kafka version 1.1.0 with Scala version 2.11
    yum -y install wget
    wget http://www-eu.apache.org/dist/kafka/1.1.0/kafka_2.11-1.1.0.tgz
    tar xvf kafka_2.11-1.1.0.tgz
    cd kafka_2.11-1.1.0/config
  • We will now configure the server.properties for the Kafka Broker. We will leave most of the file default since we are only configuring 1 test broker
    vi server.properties
  • You will need to find the following configs
    #listeners=PLAINTEXT://:9092
    #advertised_listeners=PLAINTEXT://your.host.name:9092
    #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    log_dirs=/tmp/kafka-logs
    zookeeper.connect=localhost:2181
  • And change them as follows
    listeners=SSL://:9093
    advertised_listeners=SSL://kafka.mintopsblog.local:9093
    listener.security.protocol.map=SSL:SSL
    log_dirs=/opt/kafka-logs
    zookeeper.connect=localhost:2181
  • The above config is not enough to make Kafka work with SSL, so we would need to add some other custom properties inside server.properties file and later on also create an SSL certificate. Add the following at the end file.
    security.protocol=SSL
    security.inter.broker.protocol=SSL
    ssl.keystore.location=/opt/ssl/kafka.server.keystore.jks
    ssl.keystore.password=sslpassphrase
    ssl.key.password=sslpassphrase
    ssl.keystore.type=JKS
    ssl.truststore.location=/opt/ssl/kafka.server.truststore.jks
    ssl.truststore.password=sslpassphrase
    ssl.truststore.type=JKS
    authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
    allow.everyone.if.no.acl.found=false
    super.users=User:CN=kafka.mintopsblog.local,OU=kafka,O=kafka,L=kafka,ST=kafka,C=XX
    ssl.client.auth=required
  • All the necessary configurations are now done for the Kafka Broker. Now, we will proceed in creating the directories and SSL certificates to make this work
    mkdir -p /opt/ssl
    cd /opt/ssl

docker2

  • Install JAVA 1.8 on the docker container since we will make use of the “keytool” command and also “openssl
    yum -y install java-1.8.0-openjdk openssl
  • You can then use the following script to create the necessary server and client certificates/keystores.
  • You would need to manually intervene when asked to provide some information (e.g. Kafka broker, passphrases and also what user you would like to give access to)
    #!/bin/bash
    
    ###Set Environment variable
    VALIDITY=1825
    
    read -p "Please specify the Kafka broker hostname without the port: " KAFKA_HOST
    echo -n "Please specify a passphrase for the server ssl certificate: " 
    read -s SSLPASSPHRASE
    echo -e "\n"
    CERTIFICATE_INFO="CN=$KAFKA_HOST,OU=kafka,O=kafka,L=kafka,ST=kafka,C=XX"
    CA_INFO="/C=XX/ST=kafka/L=kafka/O=kafka/OU=kafka/CN=$KAFKA_HOST/"
    KAFKA_SSL=/opt/ssl
    
    mkdir -p $KAFKA_SSL
    cd $KAFKA_SSL
    
    ###Create CA and server keystore/truststore###
    openssl req -new -x509 -keyout ca-key -out ca-cert -days $VALIDITY -subj $CA_INFO -passout pass:$SSLPASSPHRASE
    keytool -noprompt -keystore kafka.server.keystore.jks -alias $KAFKA_HOST -validity $VALIDITY -genkey -dname $CERTIFICATE_INFO -keypass $SSLPASSPHRASE -storepass $SSLPASSPHRASE
    keytool -noprompt -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert -storepass $SSLPASSPHRASE
    keytool -noprompt -keystore kafka.server.keystore.jks -alias $KAFKA_HOST -certreq -file cert-file-$KAFKA_HOST -storepass $SSLPASSPHRASE
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-$KAFKA_HOST -out cert-signed-$KAFKA_HOST -days $VALIDITY -CAcreateserial -passin pass:$SSLPASSPHRASE
    keytool -noprompt -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert -storepass $SSLPASSPHRASE
    keytool -noprompt -keystore kafka.server.keystore.jks -alias $KAFKA_HOST -import -file cert-signed-$KAFKA_HOST -storepass $SSLPASSPHRASE
    
    ###Create client keystore and truststore###
    read -p "Please specify a username to give Kafka ACL permissions: " USERNAME
    echo -n "Please specify a passphrase for the client ssl certificate: "
    read -s CLIENT_SSLPASSPHRASE
    echo -e "\n"
    CLIENT_CERTIFICATE_INFO="CN=$USERNAME,OU=kafka,O=kafka,L=kafka,ST=kafka,C=XX"
    
    keytool -noprompt -keystore kafka.client.keystore.jks -alias $USERNAME -validity $VALIDITY -genkey -dname $CLIENT_CERTIFICATE_INFO -keypass $CLIENT_SSLPASSPHRASE -storepass $CLIENT_SSLPASSPHRASE
    keytool -noprompt -keystore kafka.client.truststore.jks -alias CARoot -import -file ca-cert -storepass $CLIENT_SSLPASSPHRASE
    keytool -noprompt -keystore kafka.client.keystore.jks -alias $USERNAME -certreq -file cert-file-client-$USERNAME -storepass $CLIENT_SSLPASSPHRASE
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file-client-$USERNAME -out cert-signed-client-$USERNAME -days $VALIDITY -CAcreateserial -passin pass:$SSLPASSPHRASE
    
    ###Add client certificate to server truststore###
    keytool -keystore kafka.server.truststore.jks -alias $USERNAME -import -file cert-signed-client-$USERNAME -storepass $SSLPASSPHRASE

docker3

  • Once the certificates and keystores are created, we can start the Kafka Broker and the embedded/local zookeeper which comes with the installation.
    cd /opt/kafka/kafka_2.11-1.1.0/bin/
  • First you would need to start the zookeeper service
    nohup ./zookeeper-server-start.sh ../config/zookeeper.properties > zookeeper.out &
  • After zookeeper has started, start the kafka broker
    nohup ./kafka-server-start.sh ../config/server.properties > kafka.out &
  • Next, we will create a kafka topic so that we can then configure the ACLs.
    ./kafka-topics.sh --zookeeper localhost:2181 --create --topic mintopsblog --partitions 1 --replication-factor 1
  • Now we will configure the ACLs on this newly created topic for the user that was created in the certificate (in my case the user is “mintopsblog”).
  • Manually (Producer)
docker4
./kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –add –allow-principal User:”CN=mintopsblog,OU=kafka,O=kafka,L=kafka,ST=kafka,C=XX” –producer –topic mintopsblog
  • Manually (Consumer)
docker6
./kafka-acls.sh –authorizer-properties zookeeper.connect=localhost:2181 –add –allow-principal User:”CN=mintopsblog,OU=kafka,O=kafka,L=kafka,ST=kafka,C=XX” –consumer –topic mintopsblog –group mintopsblog-consumer
  • Or use the following script if you wish.
    #!/bin/bash
    set -x
    read -p "Please specify the username for the Kafka ACLs permissions: " USERNAME
    CLIENT_CERTIFICATE_INFO="CN=$USERNAME,OU=kafka,O=kafka,L=kafka,ST=kafka,C=XX"
    echo -n "Please specify the passphrase for the client ssl certificate: "
    read -s CLIENT_SSLPASSPHRASE
    echo -e "\n"
    
    read -p "Please specify the kafka /bin directory: " KAFKA_DIRECTORY
    read -p "Please specify a kafka topic for Kafka ACL: " TOPIC
    read -p "Specify zookeeper hosts (host:port): " ZKHOST
    read -p "Please specify permissions (producer or consumer): " PERMISSION
    
    if [[ "$PERMISSION" == "consumer" ]]; 
     then
    
    ###Consumer permissions###
     read -p "Please specify consumer group id: " CONSUMERID
     $KAFKA_DIRECTORY/kafka-acls --authorizer-properties zookeeper.connect=$ZKHOST --add --allow-principal User:$CLIENT_CERTIFICATE_INFO --$PERMISSION --topic $TOPIC --group $CONSUMERID
    
    else
     ###Producer permissions###
     $KAFKA_DIRECTORY/kafka-acls --authorizer-properties zookeeper.connect=$ZKHOST --add --allow-principal User:$CLIENT_CERTIFICATE_INFO --$PERMISSION --topic $TOPIC
    
    fi
    KAFKA_SSL=/opt/ssl
    cat > $KAFKA_SSL/client-ssl.properties << EOF
    security.protocol=SSL
    ssl.truststore.location=$KAFKA_SSL/kafka.client.truststore.jks
    ssl.truststore.password=$CLIENT_SSLPASSPHRASE
    ssl.keystore.location=$KAFKA_SSL/kafka.client.keystore.jks
    ssl.keystore.password=$CLIENT_SSLPASSPHRASE
    ssl.key.password=$CLIENT_SSLPASSPHRASE
    EOF
  • You would need to create another file for the client so that it can use the client-keystore and client-truststore that were previously created. Change the password and necessary configs depending on how you previously set them up
    cat > /opt/ssl/client-ssl.properties << EOF 
    security.protocol=SSL 
    ssl.truststore.location=/opt/ssl/kafka.client.truststore.jks 
    ssl.truststore.password=sslpassphrase1
    ssl.keystore.location=/opt/ssl/kafka.client.keystore.jks 
    ssl.keystore.password=sslpassphrase1 
    ssl.key.password=sslpassphrase1
    EOF
  • You can then check whether you can produce or consume using the following commands
    • Consumer
      ./kafka-console-consumer.sh --bootstrap-server kafka.mintopsblog.local:9093 --topic mintopsblog --consumer.config /opt/ssl/client-ssl.properties --group mintopsblog-consumer --from-beginning
    • Producer
      ./kafka-console-producer.sh --broker-list kafka.mintopsblog.local:9093 --topic mintopsblog --producer.config /opt/ssl/client-ssl.properties

Should you not have ACLs on a specific topic you will get the following alert:

docker5

N.B. For the ACLs to work properly, you would need to create a different certificate for each user you will have. This certificate would then need to be added to the Kafka server truststore and then the Kafka broker would need to be restarted so that the SSL handshake can happen.


Hope this guide helps you out, if you have any difficulties don’t hesitate to post a comment. Also any needed improvements or mistakes done in the guides feel free to point them out.

Advertisements

15 comments

  1. Yes, this has to be done on each node for it to function properly. The kafka cluster will communicate with each other through SSL, just make sure that the truststore is configured on each node so that the verification can be done properly and each node can verify each other

    Like

  2. Hi, thanks for posting this. In the first script, you create a CA, but if I have multiple servers I want one CA correct? How do you sign those remotely or do you sign them for all brokers and distribute the relevant keystores somehow?

    Like

  3. Hi David,

    You’re correct, one CA is all that is needed to sign the certificates (change the necessary info of the CA however you like). What you can do is, to have all the broker keystores signed by the same CA, so that all the kafka brokers will have the same CA info in their keystores/certificates. You can either do this on one server and then distribute the relevant keystores to their specific brokers, or have a mounted network drive on all brokers, put the CA certificate in this drive, make sure that all brokers will have central access to it, and run the necessary script as needed on each broker.

    N.B: If Kafka SSL is all you need (without Kafka Authorizer), you could make it work by creating a wildcard certificate (e.g. *.mintopsblog.com). Basically creating one certificate/keystore/truststore and all brokers will use the same one.

    Hope that helps…

    Like

  4. Thanks Elton. That’s what I thought. I think I’m going to create dummy stores, create the signing requests then distribute the certs to the relevant hosts individually, importing to the actual stores. That way I can sign them all once every so often with a script and update them (semi) easily without having to keep the ca-key available all the time.

    I’m just finding my feet with the authentication bit so this was really helpful.

    D

    Liked by 1 person

  5. Hi Elton,
    thanks for nice writeup. I followed same and used your scripts. But, i keep getting
    [2019-07-06 11:43:01,759] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 4 : {kafka-acl-topic=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)

    I verified below

    * Topic exists and ACL shows proper definations for user
    * signed certificate of user has proper ownership name
    * Broker Trust store has imported client cert, reflects the owner name
    * server.properties are intact with list you have given
    * client properties files reflects what you have provided above
    * hostnames are rechable

    Iam clueless

    -K

    Like

  6. Hi,

    I am getting below error while executing

    echo “Hello, World” | ./kafka-console-producer.sh –broker-list smartsensor.coe001.net:9093 –topic coetest –producer.config /opt/ssl/client-ssl.properties > /dev/null

    [2019-07-12 18:10:49,720] ERROR Error when sending message to topic coetest with key: null, value: 12 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Topic coetest not present in metadata after 60000 ms.

    Like

  7. Hi Kiran & Guru, sorry for the late reply. Best way to debug this issue is to enable some more logging on the authorizer itself. Add the following inside log4j.properties

    log4j.logger.kafka.authorizer.logger=DEBUG, authorizerAppender

    This should give you some more info on what might be happening.

    Like

  8. Pretty good tutorial man thanks… one suggestion… maybe avoid redirecting std. error to /dev/null in your Bash script. For example… I was being lazy and made a four-letter password (for testing only)… The script failed with no error message. Also I thought I had installed openssl in my docker container (it had failed without me realizing – it was late at night :-)… Therefore the script failed again with no warning as to why. I ended up just going through each command one by one to find out what was failing.

    Other than that thanks for the tutorial…

    Like

  9. Thanks, mostly the /dev/null was set in the script to not give the user alot of log messages on the screen in relation to the certificate creation, but you’re right, as this basically hides any errors that the user might experience. Will remove them accordingly.

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: