Fluentd Subscription Network

View Original

Fluentd with Kafka : (2/2) Steps to run Fluentd and Kafka

In the previous post, we discussed key use cases and approach regarding Fluentd and Kafka. Now, we will be reviewing the step-by-step instruction of how to configure Fluentd and Kafka with some security features.

Goal of this is to help you get started with Fluentd and Kafka with confidence by touching on a few tips.
Pay attentions to “Note” sections!! Those are not written in the official Kafka / Fluentd documentations!!

This blog covers following steps:

Test Environment

You can find four Linux instances here.

Note : Please use output of “hostname --fqdn“ command as Hostname.

InstanceHostname(FQDN)Role
mgmt01mgmt01.demo.localKerberos Server
kafka01kafka01.demo.localZookeeper, Kafka Broker
kafka02kafka02.demo.localKafka Broker
fluent01fluent01.demo.localKafka Client (Producer/Consumer), Fluentd Forwarder
See this content in the original post

Creating Kafka cluster

Installing Kafka packages

You need to install Kafka packages on all broker instances. This time, I install the Kafka packages on both “kafka01” and “kafka02”.

1. Download Kafka packages from Kafka website.

  • kafka_2.13-2.6.0.tgz (latest)

2. Unzip packages.

See this content in the original post

3. Copy Kafka packages to “/usr/share/kafka”.

See this content in the original post

4. Create user “kafka” for Kafka service and change ownership of Kafka directory.

See this content in the original post

5. Switch user to “kafka” and move to Kafka directory.

See this content in the original post

Configuring and activating Kafka servers

1. Edit configuration files of Kafka Broker for to enable clustering feature.

kafka01 : “server.properties“

See this content in the original post

kafka02 : “server.properties“

See this content in the original post

2. Run Zookeeper server on “kafkaf01” with default profile.

See this content in the original post

3. Run Kafka Brokers.

See this content in the original post

4. Create topic “test-topic01“.

See this content in the original post

5. Publish message “Hello World!“ into topic “test-topic01“ for test.

See this content in the original post

6. Subscribe message from topic “test-topic01“ to make sure you can get “Hello World!“.

See this content in the original post

You can learn more about Kafka in Kafka Quick Start.

See this content in the original post

Installing and configuring Fluentd

You can find installation steps of Fluentd (td-agent) in Fluentd Documentation - Installation.

Please follow Pre-Install Guide as well.

In following steps, I use “rsyslog“ for incoming data sources. You need to add the following lines into “/etc/rsyslog.conf” and restart the rsyslog service.

See this content in the original post

Then, let’s create configuration files for td-agent.

See this content in the original post

Once you prepare configuration files, run Fluentd and check if rsyslog output is sent to stdout as expected.

See this content in the original post

Configuring SSL in Kafka cluster

1. Create own CA certificate and keys and confirm if “ca-cert” and “ca-key”, are created.

See this content in the original post

2. Copy “ca-cert” and “ca-key” into work directory of Kafka Brokers.

  • In this blog, I use “/usr/share/kafka/ssl” for work directory.

3. Add generated CA into truststore with java “keytool“.

See this content in the original post

4. Generate key for Kafka Broker instances.

Note : When you execute the following command, it is required to put information such as “first and last name” and “organization unit”. You should use “Hostname(FQDN)” into “first and last name”. Otherwise, Kafka Broker will fail to create SSL handshake. In this blog, I use “kafka01.demo.local” and “kafka02.demo.local” respectively for “first and last name”.

See this content in the original post

5. Export the certificate file from the keystore.

See this content in the original post

6. Sign exported certificate file with CA files.

See this content in the original post

7. Import CA certificate and signed certificate into keystore

See this content in the original post

8. Edit configuration files of Kafka Broker to enable SSL feature.

See this content in the original post

9. Run Kafka Brokers with SSL enabled configuration.

See this content in the original post

You can learn details of how SSL works in Kafka cluster in Kafka : Encryption with SSL.

See this content in the original post

Running Fluentd Kafka plugin with SSL

Generating certificate and key files for Fluentd

1. Login to one of Kafka Broker instances and generate key for Kafka Client.

Note : When you execute the following command, it is required to put information such as “first and last name” and “organization unit”. You should use “Hostname(FQDN)” into “first and last name”. Otherwise, Kafka Broker will fail to create SSL handshake. In this blog, I use “fluent01.demo.local” for “first and last name”.

See this content in the original post

2. Import CA certificate into keystore.

See this content in the original post

3. Export certificate as “client_cert.pem“ from keystore.

See this content in the original post

4. Export client key as “client.p12“ from keystore.

See this content in the original post

5. Print client key.

Note : You can find key information between “-----BEGIN PRIVATE KEY-----” and “-----END PRIVATE KEY-----’” and copy them (including both “-----BEGIN PRIVATE KEY-----” and "-----END PRIVATE KEY-----”) into “client_key.pem”.

See this content in the original post

6. Export CA certificate as “CA_cert.pem“ from keystore.

See this content in the original post

Now, you have three files, “client_cert.pem”, “client_key.pem” and “CA_cert.pem”, required for Fluentd Kafka plugin. Please copy those files into Fluentd instance, under “/etc/td-agent/ssl“ for instance.

Configuring Kafka plugin with SSL certificate and keys

1. Update Fluentd configuration files with SSL parameters.

See this content in the original post

You can learn more about parameter settings in kafka - Fluentd and GitHub : fluent-plugin-kafka.

2. Restart td-agent to reload configuration files.

See this content in the original post

3. Confirm behavior of plugin by generating rsyslog events, restart “sshd” service for instance.

See this content in the original post

4. You can find rsyslog events were written into buffers in td-agent logs.

See this content in the original post

5. You can also find buffer files under “/var/log/td-agent/buffer/td“.

See this content in the original post

6. Once you reach flush interval, messages in buffer will send to Kafka Broker instances.

See this content in the original post

7. Now, you can confirm if messages are sent as expected by Kafka consumer applications.

Here is example code of Kafka consumer application for your reference.

See this content in the original post

Configuring Kerberos for SASL authentication

In this step, I explain how to create and configure Kerberos server from scratch.

Creating Kerberos server

If you already have Kerberos server, you can skip this section.

1. Install KDC server packages. In this step, I use “mgmt01“ instance as KDC server.

See this content in the original post

2. Update Kerberos configuration files with your realm information.

In my environment, I create realm “DEMO.LOCAL“ based on KDC server “mgmt01.demo.local“ and default domain “demo.local“.

See this content in the original post

3. Create database for realm “DEMO.LOCAL“

See this content in the original post

4. Start and enable KDC server services.

See this content in the original post

Create principals for Kafka and Fluentd

1. Login to kadmin console and create principals for Kafka and Zookeeper services.

See this content in the original post

2. Create key table files based on principals

See this content in the original post

3. Now you have four key table files, “zookeeper-kafka01.kyetab”, “kafka-server.keytab“, “kafka-server-kafka01.keytab“ and “kafka-server-kafka02.keytab“. Please copy key table files into each instance under “/usr/share/kafka/sasl“ directory.

See this content in the original post

Implementing SASL_SSL in Kafka cluster

1. Create JAAS config file for Zookeeper with generated principal and key table file path.

See this content in the original post

2. Create JAAS config file for Kafka Broker with generated principal and key table file path.

kafka01 : “kafka_server_jaas.conf”

See this content in the original post

kafka02 : “kafka_server_jaas.conf”

See this content in the original post

3. Set JAAS file path in environment values.

Add following lines into “bin/zookeeper-server-start.sh“ file.

See this content in the original post

Add following lines into “bin/kafka-run-shell.sh“ and “bin/kafka-server-start.sh“ files.

See this content in the original post

4. Edit configuration files of Zookeeper to enable SASL_SSL feature

See this content in the original post

5. Edit configuration files of Kafka Broker to enable SASL_SSL feature.

kafka01 : “server-sasl_ssl.properties“

See this content in the original post

kafka02 : “server-sasl_ssl.properties“

See this content in the original post

6. Start Zookeeper and Kafka Broker services with new configuration files. You need to stop services before start if services are already running.

See this content in the original post

Running Fluentd Kafka plugin with SASL_SSL

1. Login to “mgmt01” and create principal and key table file for “fluent01“.

See this content in the original post

2. Copy generated key table file “kafka-client-fluent01.keytab“ on “fluent01“.

3. Install Kerberos client package on “fluent01”.

See this content in the original post

4. Edit Keberos configuration file like “krb5.conf”.

5. Make sure following packages are installed.

See this content in the original post

6. Update Fluentd configuration files with SASL_SSL parameters.

Note : It is recommended to use “rdkafka2“ plugin rather than “kafka2“ plugin . SASL_SSL might not work with “kafka2“ plugin due to issue in “ruby-kafka“.

See this content in the original post

7. You can confim if keytab file and principal work as expected with following sample Kafka applications.

Note : If you are using td-agent, you can run sample sample Kafka applications easily with “/opt/td-agent/bin/ruby {file name of sample Kafka application}“

Kafka Producer Sample for SASL_SSL : “rdkafka_producer_sasl_ssl.rb”

See this content in the original post

Kafka Consumer Sample for SASL_SSL : “rdkafka_consumer_sasl_ssl.rb”

See this content in the original post

8. Restart td-agent daemon.

See this content in the original post

9. Once td-agent is activated, you can confirm behavior of plugin by generating rsyslog events, restart “sshd” service for instance.

See this content in the original post

10. Now, you can subscribe message from topic with sample kafka Consumer application.

See this content in the original post

Congratulations! You finished the use case. Looking for more to work on? Try our Fluent Bit use case of using ‘tail’ plugin and parsing multiline log data.

Commercial Service - We are here for you.

In the Fluentd Subscription Network, we will provide you consultancy and professional services to help you run Fluentd and Fluent Bit with confidence by solving your pains. Service desk is also available for your operation and the team is equipped with the Diagtool and knowledge of tips running Fluentd in production. Contact us anytime if you would like to learn more about our service offerings.