Fluentd with Kafka : (2/2) Steps to run Fluentd and Kafka

In the previous post, we discussed key use cases and approach regarding Fluentd and Kafka. Now, we will be reviewing the step-by-step instruction of how to configure Fluentd and Kafka with some security features.

Goal of this is to help you get started with Fluentd and Kafka with confidence by touching on a few tips.
Pay attentions to “Note” sections!! Those are not written in the official Kafka / Fluentd documentations!!

 
InstanceHostname(FQDN)Role
mgmt01mgmt01.demo.localKerberos Server
kafka01kafka01.demo.localZookeeper, Kafka Broker
kafka02kafka02.demo.localKafka Broker
fluent01fluent01.demo.localKafka Client (Producer/Consumer), Fluentd Forwarder

Creating Kafka cluster

Installing Kafka packages

You need to install Kafka packages on all broker instances. This time, I install the Kafka packages on both “kafka01” and “kafka02”.

1. Download Kafka packages from Kafka website.

  • kafka_2.13-2.6.0.tgz (latest)

2. Unzip packages.

[root@kafka01 ~]# tar zxfv kafka_2.13-2.6.0.tgz

3. Copy Kafka packages to “/usr/share/kafka”.

[root@kafka01 ~]# cp -r kafka_2.13-2.6.0 /usr/share/kafka

4. Create user “kafka” for Kafka service and change ownership of Kafka directory.

[root@kafka01 ~]# useradd kafka
[root@kafka01 ~]# passwd kafka
Changing password for user kafka.
New password:
Retype new password:
passwd: all authentication tokens updated successfully
[root@kafka01 ~]# chown -R kafka:kafka /usr/share/kafka

5. Switch user to “kafka” and move to Kafka directory.

[root@kafka01 ~]# su - kafka
[kafka@kafka01 ~]# cd /usr/share/kafka

Configuring and activating Kafka servers

1. Edit configuration files of Kafka Broker for to enable clustering feature.

kafka01 : “server.properties“

[kafka@kafka01 kafka]$ vim config/server.properties
----- modify the following parameters -----
broker.id=0
listerns=PLAINTEXT://kafka01.demo.local:9092
zookeeper.connect=kafka01.demo.local:2181

kafka02 : “server.properties“

[kafka@kafka02 kafka]$ vim config/server.properties
----- modify the following parameters -----
broker.id=1
listerns=PLAINTEXT://kafka02.demo.local:9092
zookeeper.connect=kafka01.demo.local:2181

2. Run Zookeeper server on “kafkaf01” with default profile.

[kafka@kafka01 kafka]$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties

3. Run Kafka Brokers.

[kafka@kafka01 kafka]$ ./bin/kafka-server-start.sh -daemon ./config/server.properties
[kafka@kafka02 kafka]$ ./bin/kafka-server-start.sh -daemon ./config/server.properties

4. Create topic “test-topic01“.

[kafka@kafka01 kafka]$ bin/kafka-topics.sh --create --zookeeper kafka01.demo.local:2181 --replication-factor 2 --partitions 3 --topic test-topic01
Created topic test-topic01.

5. Publish message “Hello World!“ into topic “test-topic01“ for test.

[kafka@kafka01 kafka]$ ./bin/kafka-console-producer.sh --topic test-topic01 --bootstrap-server 'kafka01.demo.local:9092' --producer.config ./config/producer.properties
>Hello World!

6. Subscribe message from topic “test-topic01“ to make sure you can get “Hello World!“.

[kafka@kafka02 kafka]$ ./bin/kafka-console-consumer.sh --bootstrap-server='kafka01.demo.local:9092' --topic test-topic01 --from-beginning
Hello World!

You can learn more about Kafka in Kafka Quick Start.

Installing and configuring Fluentd

You can find installation steps of Fluentd (td-agent) in Fluentd Documentation - Installation.

Please follow Pre-Install Guide as well.

In following steps, I use “rsyslog“ for incoming data sources. You need to add the following lines into “/etc/rsyslog.conf” and restart the rsyslog service.

[root@fluent01 ~]# vim /etc/rsyslog.conf
# Send log messages to Fluentd
*.* @127.0.0.1:5140
[root@fluent01 ~]# systemctl restart rsyslog

Then, let’s create configuration files for td-agent.

[root@fluent01 ~]# cd /etc/td-agent/
[root@fluent01 td-agent]# vim td-agent.conf
@include rsyslog_sample.conf
[root@fluent01 td-agent]# vim rsyslog_sample.conf
<source>
    @type syslog
    port 5140
    bind 0.0.0.0
    tag system
</source>
<match system.**>
    @type stdout
</match>

Once you prepare configuration files, run Fluentd and check if rsyslog output is sent to stdout as expected.

[root@fluent01 ~]$ systemctl start td-agent
[root@fluent01 ~]$ tail -100f /va/log/td-agent.log

------ Sample output -----
2020-11-20 23:10:08.000000000 +0000 system.daemon.info: {"host":"fluent01","ident":"systemd","pid":"1","message":"Stopping OpenSSH server daemon..."}

Configuring SSL in Kafka cluster

1. Create own CA certificate and keys and confirm if “ca-cert” and “ca-key”, are created.

[root@mgmt01 ~]# openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
[root@mgmt01 ~]# ll
total 8
-rw-r--r--. 1 root root 1294 Nov 18 07:34 ca-cert
-rw-------. 1 root root 1854 Nov 18 07:33 ca-key

2. Copy “ca-cert” and “ca-key” into work directory of Kafka Brokers.

  • In this blog, I use “/usr/share/kafka/ssl” for work directory.

3. Add generated CA into truststore with java “keytool“.

[kafka@kafka01 ssl]$ keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert

4. Generate key for Kafka Broker instances.

Note : When you execute the following command, it is required to put information such as “first and last name” and “organization unit”. You should use “Hostname(FQDN)” into “first and last name”. Otherwise, Kafka Broker will fail to create SSL handshake. In this blog, I use “kafka01.demo.local” and “kafka02.demo.local” respectively for “first and last name”.

[kafka@kafka01 ssl]$ keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA

5. Export the certificate file from the keystore.

[kafka@kafka01 ssl]$ keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file

6. Sign exported certificate file with CA files.

[kafka@kafka01 ssl]$ openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial

7. Import CA certificate and signed certificate into keystore

[kafka@kafka01 ssl] keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
[kafka@kafka01 ssl]$ keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed

8. Edit configuration files of Kafka Broker to enable SSL feature.

[kafka@kafka01 kafka]$ cd /usr/share/kafka/
[kafka@kafka01 kafka]$ cp ./config/server.properties ./config/server-ssl.properties

[kafka@kafka01 kafka]$ vim ./config/server-ssl.properties
----- Add or Modify following parameters -----
ssl.truststore.location=/usr/share/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password={your password}
ssl.keystore.location=/usr/share/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password={your password}
ssl.key.password={your password}
security.inter.broker.protocol=SSL
listeners=PLAINTEXT://kafka01.demo.local:9092,SSL://kafka01.demo.local:9093

9. Run Kafka Brokers with SSL enabled configuration.

[kafka@kafka01 kafka]$ ./bin/kafka-server-start.sh -daemon ./config/server-ssl.properties

You can learn details of how SSL works in Kafka cluster in Kafka : Encryption with SSL.

Running Fluentd Kafka plugin with SSL

Generating certificate and key files for Fluentd

1. Login to one of Kafka Broker instances and generate key for Kafka Client.

Note : When you execute the following command, it is required to put information such as “first and last name” and “organization unit”. You should use “Hostname(FQDN)” into “first and last name”. Otherwise, Kafka Broker will fail to create SSL handshake. In this blog, I use “fluent01.demo.local” for “first and last name”.

[kafka@kafka01 ~]$ cd /usr/share/kafka/ssl/
[kafka@kafka01 ssl]$ keytool -keystore kafka.client.truststore.jks -alias CARoot -importcert -file ca-cert
[kafka@kafka01 ssl]$ keytool -keystore kafka.client.keystore.jks -alias localhost -validity 365 -genkey -keyalg RSA

2. Import CA certificate into keystore.

[kafka@kafka01 ssl]$ keytool -keystore kafka.client.keystore.jks -alias CARoot -import -file ca-cert

3. Export certificate as “client_cert.pem“ from keystore.

[kafka@kafka01 ssl]$ keytool -exportcert -alias localhost -keystore kafka.client.keystore.jks -rfc -file client_cert.pem

4. Export client key as “client.p12“ from keystore.

[kafka@kafka01 ssl]$ keytool -v -importkeystore -srckeystore kafka.client.keystore.jks -srcalias localhost -destkeystore client.p12 -deststoretype PKCS12

5. Print client key.

Note : You can find key information between “-----BEGIN PRIVATE KEY-----” and “-----END PRIVATE KEY-----’” and copy them (including both “-----BEGIN PRIVATE KEY-----” and "-----END PRIVATE KEY-----”) into “client_key.pem”.

[kafka@kafka01 ssl]$ openssl pkcs12 -in client.p12 -nocerts -nodes
----- Key information is shown in output -----

[kafka@kafka01 ssl]$ vim client_key.pem
----- Paste key information -----

6. Export CA certificate as “CA_cert.pem“ from keystore.

[kafka@kafka01 ssl]$ keytool -exportcert -alias CARoot -keystore kafka.client.keystore.jks -rfc -file CA_cert.pem

Now, you have three files, “client_cert.pem”, “client_key.pem” and “CA_cert.pem”, required for Fluentd Kafka plugin. Please copy those files into Fluentd instance, under “/etc/td-agent/ssl“ for instance.

Configuring Kafka plugin with SSL certificate and keys

1. Update Fluentd configuration files with SSL parameters.

[root@fluent01 ~]# cd /etc/td-agent/
[root@fluent01 td-agent]# vim td-agent.conf
@include rsyslog_kafka_ssl.conf
[root@fluent01 td-agent]# vim rsyslog_kafka_ssl.conf
<source>
    @type syslog
    port 5140
    bind 0.0.0.0
    tag system
</source>
<match system.**>
    @type rdkafka2
    # list of seed brokers
    brokers kafka01.demo.local:9093, kafka02.demo.local:9093
    use_event_time true
    ssl_ca_cert "/etc/td-agent/ssl/CA_cert.pem"
    ssl_client_cert "/etc/td-agent/ssl/client_cert.pem"
    ssl_client_cert_key "/etc/td-agent/ssl/client_key.pem"

    # topic settings
    topic_key test-topic01
    default_topic test-topic01

    # producer settings
    required_acks -1
    compression_codec gzip
    @log_level trace

    # buffer settings
    <buffer>
        @type file
        path /var/log/td-agent/buffer/td
        flush_interval 60s
    </buffer>

    # data type settings
    <format>
        @type json
    </format>
</match>

You can learn more about parameter settings in kafka - Fluentd and GitHub : fluent-plugin-kafka.

2. Restart td-agent to reload configuration files.

[root@fluent01 td-agent]# systemctl restart td-agent

3. Confirm behavior of plugin by generating rsyslog events, restart “sshd” service for instance.

[root@fluent01 td-agent]# systemctl restart sshd

4. You can find rsyslog events were written into buffers in td-agent logs.

[root@fluent01 td-agent]# tail -100f /var/log/td-agent/td-agent.log

------ Sample output -----
2020-11-24 07:55:52 +0000 [trace]: #0 writing events into buffer instance=1880 metadata_size=1
2020-11-24 07:55:52 +0000 [trace]: #0 chunk /var/log/td-agent/buffer/td/buffer.b5b4d5a3c30205ced0cdd657565d02535.log size_added: 89 new_size: 911

5. You can also find buffer files under “/var/log/td-agent/buffer/td“.

[root@fluent01 td-agent]# ll /var/log/td-agent/buffer/td/
total 8 -rw-r--r--. 1 td-agent td-agent 911 Nov 24 07:55 buffer.b5b4d5a3c30205ced0cdd657565d02535.log
-rw-r--r--. 1 td-agent td-agent 82 Nov 24 07:55 buffer.b5b4d5a3c30205ced0cdd657565d02535.log.meta

6. Once you reach flush interval, messages in buffer will send to Kafka Broker instances.

[root@fluent01 td-agent]# tail -100f /var/log/td-agent/td-agent.log

------ Sample output -----
2020-11-24 07:56:55 +0000 [trace]: #0 trying flush for a chunk chunk="5b4d5a3c30205ced0cdd657565d02535"
2020-11-24 07:56:55 +0000 [trace]: #0 adding write count instance=1900
2020-11-24 07:56:55 +0000 [trace]: #0 executing sync write chunk="5b4d5a3c30205ced0cdd657565d02535"
2020-11-24 07:56:56 +0000 [trace]: #0 write operation done, committing chunk="5b4d5a3c30205ced0cdd657565d02535"
2020-11-24 07:56:56 +0000 [trace]: #0 committing write operation to a chunk chunk="5b4d5a3c30205ced0cdd657565d02535" delayed=false
2020-11-24 07:56:56 +0000 [trace]: #0 purging a chunk instance=1880 chunk_id="5b4d5a3c30205ced0cdd657565d02535" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables={:topic=>nil}, seq=0>
2020-11-24 07:56:56 +0000 [trace]: #0 chunk purged instance=1880 chunk_id="5b4d5a3c30205ced0cdd657565d02535" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables={:topic=>nil}, seq=0>
2020-11-24 07:56:56 +0000 [trace]: #0 done to commit a chunk chunk="5b4d5a3c30205ced0cdd657565d02535"

7. Now, you can confirm if messages are sent as expected by Kafka consumer applications.

Here is example code of Kafka consumer application for your reference.

require 'ruby-kafka'

topic = 'test-topic01'
kafka_client_crt = '/etc/td-agent/ssl/client_cert.pem'
kafka_client_key = '/etc/td-agent/ssl/client_key.pem'
ca_crt = '/etc/td-agent/ssl/CA_cert.pem'

def read_ssl_file(path)
    return nil if path.nil?
    if path.is_a?(Array)
        path.map { |fp|     File.read(fp) }
    else
        File.read(path)
    end
end

kafka = Kafka.new(seed_brokers: 'kafka01.demo.local:9093',
    client_id: "my-application",
    ssl_ca_cert: read_ssl_file(ca_crt),
    ssl_client_cert:     read_ssl_file(kafka_client_crt),
    ssl_client_cert_key: read_ssl_file(kafka_client_key)
    )
consumer = kafka.consumer(group_id:"my-consumer")
consumer.subscribe(topic)
trap("TERM") { consumer.stop }
consumer.each_message do |message|
    puts message.value
end

Configuring Kerberos for SASL authentication

In this step, I explain how to create and configure Kerberos server from scratch.

Creating Kerberos server

If you already have Kerberos server, you can skip this section.

1. Install KDC server packages. In this step, I use “mgmt01“ instance as KDC server.

[root@mgmt01 ~]# yum install krb5-server krb5-devel krb5-workstation

2. Update Kerberos configuration files with your realm information.

In my environment, I create realm “DEMO.LOCAL“ based on KDC server “mgmt01.demo.local“ and default domain “demo.local“.

[root@mgmt01 ~]# vim /etc/krb5.conf
# To opt out of the system crypto-policies configuration of krb5, remove the
# symlink at /etc/krb5.conf.d/crypto-policies which will not be recreated.
includedir /etc/krb5.conf.d/

[logging]
        default = FILE:/var/log/krb5libs.log
        kdc = FILE:/var/log/krb5kdc.log
        admin_server = FILE:/var/log/kadmind.log

[libdefaults]
        dns_lookup_realm = false
        ticket_lifetime = 7d
        renew_lifetime = 7d
        forwardable = true
        rdns = false
        pkinit_anchors = FILE:/etc/pki/tls/certs/ca-bundle.crt
        spake_preauth_groups = edwards25519
        default_realm = DEMO.LOCAL
        default_ccache_name = KEYRING:persistent:%{uid}

[realms]
DEMO.LOCAL = {
        kdc = mgmt01.demo.local
        admin_server = mgmt01.demo.local
        default_domain = demo.local
}
[domain_realm]
.demo.local = DEMO.LOCAL
demo.local = DEMO.LOCAL

[root@mgmt01 ~]# vim /var/kerberos/krb5kdc/kdc.conf
[kdcdefaults]
        kdc_ports = 88
        kdc_tcp_ports = 88
        spake_preauth_kdc_challenge = edwards25519

[realms]
DEMO.LOCAL = {
        #master_key_type = aes256-cts
        acl_file = /var/kerberos/krb5kdc/kadm5.acl
        dict_file = /usr/share/dict/words
        admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
        supported_enctypes = aes256-cts:normal aes128-cts:normal des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal
}

[root@mgmt01 ~]# cat /var/kerberos/krb5kdc/kadm5.acl
*/admin@DEMO.LOCAL

3. Create database for realm “DEMO.LOCAL“

[root@mgmt01 ~]# kdb5_util create -s -r DEMO.LOCAL

4. Start and enable KDC server services.

[root@mgmt01 ~]# systemctl start krb5kdc
[root@mgmt01 ~]# systemctl start kadmin
[root@mgmt01 ~]# systemctl enable krb5kdc
[root@mgmt01 ~]# systemctl enable kadmin

Create principals for Kafka and Fluentd

1. Login to kadmin console and create principals for Kafka and Zookeeper services.

[root@mgmt01 ~]# kadmin.local
Authenticating as principal root/admin@DEMO.LOCAL with password. kadmin.local:

kadmin.local: add_principal -randkey zookeeper/kafka01.demo.local@DEMO.LOCAL
kadmin.local: add_principal -randkey kafka-server@DEMO.LOCAL
kadmin.local: add_principal -randkey kafka-server/kafka01.demo.local@DEMO.LOCAL
kadmin.local: add_principal -randkey kafka-server/kafka02.demo.local@DEMO.LOCAL

2. Create key table files based on principals

kadmin.local: ktadd -k zookeeper-kafka01.keytab zookeeper/kafka01.demo.local@DEMO.LOCAL
kadmin.local: ktadd -k kafka-server.keytab kafka-server@DEMO.LOCAL
kadmin.local: ktadd -k kafka-server-kafka01.keytab kafka-server/kafka01.demo.local@DEMO.LOCAL
kadmin.local: ktadd -k kafka-server-kafka02.keytab kafka-server/kafka02.demo.local@DEMO.LOCAL

3. Now you have four key table files, “zookeeper-kafka01.kyetab”, “kafka-server.keytab“, “kafka-server-kafka01.keytab“ and “kafka-server-kafka02.keytab“. Please copy key table files into each instance under “/usr/share/kafka/sasl“ directory.

Implementing SASL_SSL in Kafka cluster

1. Create JAAS config file for Zookeeper with generated principal and key table file path.

[kafka@kafka01 sasl]$ cd /usr/share/kafka/sasl/
[kafka@kafka01 sasl]$ vim zookeeper_jaas.conf
Server {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true
        keyTab="/usr/share/kafka/sasl/zookeeper-kafka01.keytab"
        storeKey=true
        useTicketCache=false
        principal="zookeeper/kafka01.demo.local@DEMO.LOCAL";
};

2. Create JAAS config file for Kafka Broker with generated principal and key table file path.

kafka01 : “kafka_server_jaas.conf”

[kafka@kafka01 sasl]$ vim kafka_server_jaas.conf
KafkaServer {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true
        storeKey=true
        debug=true
        serviceName="kafka-server"
        keyTab="/usr/share/kafka/sasl/kafka-server-kafka01.keytab"
        principal="kafka-server/kafka01.demo.local@DEMO.LOCAL";
};

Client {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true
        storeKey=true
        debug=true
        serviceName="kafka-server"
        keyTab="/usr/share/kafka/sasl/kafka-server.keytab"
        principal="kafka-server/kafka01.demo.local@DEMO.LOCAL";
};

kafka02 : “kafka_server_jaas.conf”

[kafka@kafka02 sasl]$ vim kafka_server_jaas.conf
KafkaServer {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true
        storeKey=true
        debug=true
        serviceName="kafka-server"
        keyTab="/usr/share/kafka/sasl/kafka-server-kafka02.keytab"
        principal="kafka-server/kafka02.demo.local@DEMO.LOCAL";
};

Client {
        com.sun.security.auth.module.Krb5LoginModule required debug=true
        useKeyTab=true
        storeKey=true
        debug=true
        serviceName="kafka-server"
        keyTab="/usr/share/kafka/sasl/kafka-server.keytab"
        principal="kafka-server/kafka02.demo.local@DEMO.LOCAL";
};

3. Set JAAS file path in environment values.

Add following lines into “bin/zookeeper-server-start.sh“ file.

if [ -z "$KAFKA_OPTS" ]; then
        KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/share/kafka/sasl/zookeeper_jaas.conf -Dsun.security.krb5.debug=true"
fi

Add following lines into “bin/kafka-run-shell.sh“ and “bin/kafka-server-start.sh“ files.

if [ -z "$KAFKA_OPTS" ]; then
        KAFKA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Djava.security.auth.login.config=/usr/share/kafka/sasl/kafka_server_jaas.conf -Dsun.security.krb5.debug=true"
fi

4. Edit configuration files of Zookeeper to enable SASL_SSL feature

[kafka@kafka01 kafka]$ cp ./config/zookeeper.properties ./config/zookeeper-sasl_ssl.properties
[kafka@kafka01 kafka]$ vim ./config/zookeeper-sasl_ssl.properties
----- Add or Modify following parameters ----
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
jaasLoginRenew=3600000
kerberos.removeHostFromPrincipal=true
kerberos.removeRealmFromPrincipal=true

5. Edit configuration files of Kafka Broker to enable SASL_SSL feature.

kafka01 : “server-sasl_ssl.properties“

[kafka@kafka01 kafka]$ cd /usr/share/kafka/
[kafka@kafka01 kafka]$ cp ./config/server-ssl.properties ./config/server-sasl_ssl.properties

[kafka@kafka01 kafka]$ vim ./config/server-sasl_ssl.properties
----- Add or Modify following parameters -----
# SSL Parameters
ssl.truststore.location=/usr/share/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password={your password}
ssl.keystore.location=/usr/share/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password={your password}
ssl.key.password={your password}

# SASL_SSL Parameters
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanism=GSSAPI
sasl.kerberos.service.name=kafka-server
listeners=SASL_SSL://kafka01.demo.local:9093

kafka02 : “server-sasl_ssl.properties“

[kafka@kafka02 kafka]$ cd /usr/share/kafka/
[kafka@kafka02 kafka]$ cp ./config/server-ssl.properties ./config/server-sasl_ssl.properties

[kafka@kafka02 kafka]$ vim ./config/server-sasl_ssl.properties
----- Add or Modify following parameters -----
# SSL Parameters
ssl.truststore.location=/usr/share/kafka/ssl/kafka.server.truststore.jks
ssl.truststore.password={your password}
ssl.keystore.location=/usr/share/kafka/ssl/kafka.server.keystore.jks
ssl.keystore.password={your password}
ssl.key.password={your password}

# SASL_SSL Parameters
security.inter.broker.protocol=SASL_SSL
sasl.mechanism.inter.broker.protocol=GSSAPI
sasl.enabled.mechanism=GSSAPI
sasl.kerberos.service.name=kafka-server
listeners=SASL_SSL://kafka02.demo.local:9093

6. Start Zookeeper and Kafka Broker services with new configuration files. You need to stop services before start if services are already running.

[kafka@kafka01 kafka]$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper-sasl_ssl.properties
[kafka@kafka01 kafka]$ ./bin/kafka-server-start.sh -daemon ./config/server-sasl_ssl.properties
[kafka@kafka02 kafka]$ ./bin/kafka-server-start.sh -daemon ./config/server-sasl_ssl.properties

Running Fluentd Kafka plugin with SASL_SSL

1. Login to “mgmt01” and create principal and key table file for “fluent01“.

[root@mgmt01 ~]# kadmin.local
kadmin.local:
kadmin.local: add_principal -randkey kafka-client/fluent01.demo.local@DEMO.LOCAL
kadmin.local: ktadd -k kafka-client-fluent01.keytab kafka-client/fluent01.demo.local@DEMO.LOCAL

2. Copy generated key table file “kafka-client-fluent01.keytab“ on “fluent01“.

3. Install Kerberos client package on “fluent01”.

[root@fluent01 ~]# yum install krb5-workstation

4. Edit Keberos configuration file like “krb5.conf”.

5. Make sure following packages are installed.

[root@fluent01 ~]# yum install cyrus-sasl-gssapi cyrus-sasl-lib

6. Update Fluentd configuration files with SASL_SSL parameters.

Note : It is recommended to use “rdkafka2“ plugin rather than “kafka2“ plugin . SASL_SSL might not work with “kafka2“ plugin due to issue in “ruby-kafka“.

[root@fluent01 ~]# cd /etc/td-agent/
[root@fluent01 td-agent]# vim td-agent.conf
@include rsyslog_kafka_sasl_ssl.conf
[root@fluent01 td-agent]# vim rsyslog_kafka_sasl_ssl.conf
<source>
    @type syslog
    port 5140
    bind 0.0.0.0
    tag system
</source>
<match system.**>
    @type rdkafka2
    # list of seed brokers
    brokers kafka01.demo.local:9093, kafka02.demo.local:9093
    use_event_time true
    ssl_ca_cert "/etc/td-agent/ssl/CA_cert.pem"
    ssl_client_cert "/etc/td-agent/ssl/client_cert.pem"
    ssl_client_cert_key "/etc/td-agent/ssl/client_key.pem"

    sasl_over_ssl true
    principal "kafka-client/fluent01.demo.local@DEMO.LOCAL"
    keytab "/etc/td-agent/sasl/kafka-client-fluent01.keytab"
    service_name kafka-server

    # topic settings
    topic_key test-topic01
    default_topic test-topic01

    # producer settings
    required_acks -1
    compression_codec gzip
    @log_level trace

    # buffer settings
    <buffer>
        @type file
        path /var/log/td-agent/buffer/td
        flush_interval 60s
    </buffer>

    # data type settings
    <format>
        @type json
    </format>
</match>

7. You can confim if keytab file and principal work as expected with following sample Kafka applications.

Note : If you are using td-agent, you can run sample sample Kafka applications easily with “/opt/td-agent/bin/ruby {file name of sample Kafka application}“

Kafka Producer Sample for SASL_SSL : “rdkafka_producer_sasl_ssl.rb”

require 'rdkafka'

kafka_client_crt = '/etc/td-agent/ssl/client_cert.pem'
kafka_client_key = '/etc/td-agent/ssl/client_key.pem'
ca_crt = '/etc/td-agent/ssl/CA_cert.pem'

config = {
        :"bootstrap.servers" => "kafka01.demo.local:9093",
        :"security.protocol" => "SASL_SSL",
        :"sasl.mechanisms" => "GSSAPI",
        :"sasl.kerberos.principal" => "kafka-client/fluent01.demo.local@DEMO.LOCAL",
        :"sasl.kerberos.keytab" => "/etc/td-agent/sasl/kafka-client-fluent01.keytab",
        :"ssl.ca.location" => ca_crt,
        :"ssl.certificate.location"=> kafka_client_crt,
        :"ssl.key.location" => kafka_client_key,
        :"sasl.kerberos.service.name" => "kafka-server"
}

rdkafka = Rdkafka::Config.new(config)
producer = rdkafka.producer
producer.produce(topic: "test-topic01", payload: "Hello World!").wait

Kafka Consumer Sample for SASL_SSL : “rdkafka_consumer_sasl_ssl.rb”

require 'rdkafka'

kafka_client_crt = '/etc/td-agent/ssl/client_cert.pem'
kafka_client_key = '/etc/td-agent/ssl/client_key.pem'
ca_crt = '/etc/td-agent/ssl/CA_cert.pem'

config = {
        :"bootstrap.servers" => "kafka01.demo.local:9093",
        :"security.protocol" => "SASL_SSL",
        :"sasl.mechanisms" => "GSSAPI",
        :"sasl.kerberos.principal" => "kafka-client/fluent01.demo.local@DEMO.LOCAL",
        :"sasl.kerberos.keytab" => "/etc/td-agent/sasl/kafka-client-fluent01.keytab",
        :"ssl.ca.location" => ca_crt,
        :"ssl.certificate.location"=> kafka_client_crt,
        :"ssl.key.location" => kafka_client_key,
        :"sasl.kerberos.service.name" => "kafka-server",
        :"group.id" => "mygroup"
}

rdkafka = Rdkafka::Config.new(config)
consumer = rdkafka.consumer
consumer.subscribe("test-topic01")
consumer.each do |message|
        puts "Message received: #{message}"
end

8. Restart td-agent daemon.

[root@fluent01 td-agent]# systemctl restart td-agent

9. Once td-agent is activated, you can confirm behavior of plugin by generating rsyslog events, restart “sshd” service for instance.

[root@fluent01 td-agent]# systemctl restart sshd

10. Now, you can subscribe message from topic with sample kafka Consumer application.

[root@fluent01 sasl]# /opt/td-agent/bin/ruby rdkafka_consumer_sasl_ssl.rb

Message received: <Message in 'test-topic01' with key '', payload '{"host":"fluent01","ident":"sshd","pid":...', partition 0, offset 231, timestamp 2020-11-30 01:56:28 +0000>

Congratulations! You finished the use case. Looking for more to work on? Try our Fluent Bit use case of using ‘tail’ plugin and parsing multiline log data.

Commercial Service - We are here for you.

In the Fluentd Subscription Network, we will provide you consultancy and professional services to help you run Fluentd and Fluent Bit with confidence by solving your pains. Service desk is also available for your operation and the team is equipped with the Diagtool and knowledge of tips running Fluentd in production. Contact us anytime if you would like to learn more about our service offerings.

Previous
Previous

Upgrade td-agent from v3 to v4

Next
Next

Fluentd with Kafka (1/2) Use cases