Fluentd with Kafka : (2/2) Steps to run Fluentd and Kafka
In the previous post, we discussed key use cases and approach regarding Fluentd and Kafka. Now, we will be reviewing the step-by-step instruction of how to configure Fluentd and Kafka with some security features.
Goal of this is to help you get started with Fluentd and Kafka with confidence by touching on a few tips.
Pay attentions to “Note” sections!! Those are not written in the official Kafka / Fluentd documentations!!
This blog covers following steps:
Test Environment
You can find four Linux instances here.
Note : Please use output of “hostname --fqdn“ command as Hostname.
Instance | Hostname(FQDN) | Role |
---|---|---|
mgmt01 | mgmt01.demo.local | Kerberos Server |
kafka01 | kafka01.demo.local | Zookeeper, Kafka Broker |
kafka02 | kafka02.demo.local | Kafka Broker |
fluent01 | fluent01.demo.local | Kafka Client (Producer/Consumer), Fluentd Forwarder |
OS : CentOS8
Kafka version : 2.6 (Kafka Quick Start)
Fluentd version : td-agent v4 (Fluentd Quick Start)
Creating Kafka cluster
Installing Kafka packages
You need to install Kafka packages on all broker instances. This time, I install the Kafka packages on both “kafka01” and “kafka02”.
1. Download Kafka packages from Kafka website.
kafka_2.13-2.6.0.tgz (latest)
2. Unzip packages.
3. Copy Kafka packages to “/usr/share/kafka”.
4. Create user “kafka” for Kafka service and change ownership of Kafka directory.
5. Switch user to “kafka” and move to Kafka directory.
Configuring and activating Kafka servers
1. Edit configuration files of Kafka Broker for to enable clustering feature.
kafka01 : “server.properties“
kafka02 : “server.properties“
2. Run Zookeeper server on “kafkaf01” with default profile.
3. Run Kafka Brokers.
4. Create topic “test-topic01“.
5. Publish message “Hello World!“ into topic “test-topic01“ for test.
6. Subscribe message from topic “test-topic01“ to make sure you can get “Hello World!“.
You can learn more about Kafka in Kafka Quick Start.
Installing and configuring Fluentd
You can find installation steps of Fluentd (td-agent) in Fluentd Documentation - Installation.
Please follow Pre-Install Guide as well.
In following steps, I use “rsyslog“ for incoming data sources. You need to add the following lines into “/etc/rsyslog.conf” and restart the rsyslog service.
Then, let’s create configuration files for td-agent.
Once you prepare configuration files, run Fluentd and check if rsyslog output is sent to stdout as expected.
Configuring SSL in Kafka cluster
1. Create own CA certificate and keys and confirm if “ca-cert” and “ca-key”, are created.
2. Copy “ca-cert” and “ca-key” into work directory of Kafka Brokers.
In this blog, I use “/usr/share/kafka/ssl” for work directory.
3. Add generated CA into truststore with java “keytool“.
4. Generate key for Kafka Broker instances.
Note : When you execute the following command, it is required to put information such as “first and last name” and “organization unit”. You should use “Hostname(FQDN)” into “first and last name”. Otherwise, Kafka Broker will fail to create SSL handshake. In this blog, I use “kafka01.demo.local” and “kafka02.demo.local” respectively for “first and last name”.
5. Export the certificate file from the keystore.
6. Sign exported certificate file with CA files.
7. Import CA certificate and signed certificate into keystore
8. Edit configuration files of Kafka Broker to enable SSL feature.
9. Run Kafka Brokers with SSL enabled configuration.
You can learn details of how SSL works in Kafka cluster in Kafka : Encryption with SSL.
Running Fluentd Kafka plugin with SSL
Generating certificate and key files for Fluentd
1. Login to one of Kafka Broker instances and generate key for Kafka Client.
Note : When you execute the following command, it is required to put information such as “first and last name” and “organization unit”. You should use “Hostname(FQDN)” into “first and last name”. Otherwise, Kafka Broker will fail to create SSL handshake. In this blog, I use “fluent01.demo.local” for “first and last name”.
2. Import CA certificate into keystore.
3. Export certificate as “client_cert.pem“ from keystore.
4. Export client key as “client.p12“ from keystore.
5. Print client key.
Note : You can find key information between “-----BEGIN PRIVATE KEY-----” and “-----END PRIVATE KEY-----’” and copy them (including both “-----BEGIN PRIVATE KEY-----” and "-----END PRIVATE KEY-----”) into “client_key.pem”.
6. Export CA certificate as “CA_cert.pem“ from keystore.
Now, you have three files, “client_cert.pem”, “client_key.pem” and “CA_cert.pem”, required for Fluentd Kafka plugin. Please copy those files into Fluentd instance, under “/etc/td-agent/ssl“ for instance.
Configuring Kafka plugin with SSL certificate and keys
1. Update Fluentd configuration files with SSL parameters.
You can learn more about parameter settings in kafka - Fluentd and GitHub : fluent-plugin-kafka.
2. Restart td-agent to reload configuration files.
3. Confirm behavior of plugin by generating rsyslog events, restart “sshd” service for instance.
4. You can find rsyslog events were written into buffers in td-agent logs.
5. You can also find buffer files under “/var/log/td-agent/buffer/td“.
6. Once you reach flush interval, messages in buffer will send to Kafka Broker instances.
7. Now, you can confirm if messages are sent as expected by Kafka consumer applications.
Here is example code of Kafka consumer application for your reference.
Configuring Kerberos for SASL authentication
In this step, I explain how to create and configure Kerberos server from scratch.
Creating Kerberos server
If you already have Kerberos server, you can skip this section.
1. Install KDC server packages. In this step, I use “mgmt01“ instance as KDC server.
2. Update Kerberos configuration files with your realm information.
In my environment, I create realm “DEMO.LOCAL“ based on KDC server “mgmt01.demo.local“ and default domain “demo.local“.
3. Create database for realm “DEMO.LOCAL“
4. Start and enable KDC server services.
Create principals for Kafka and Fluentd
1. Login to kadmin console and create principals for Kafka and Zookeeper services.
2. Create key table files based on principals
3. Now you have four key table files, “zookeeper-kafka01.kyetab”, “kafka-server.keytab“, “kafka-server-kafka01.keytab“ and “kafka-server-kafka02.keytab“. Please copy key table files into each instance under “/usr/share/kafka/sasl“ directory.
Implementing SASL_SSL in Kafka cluster
1. Create JAAS config file for Zookeeper with generated principal and key table file path.
2. Create JAAS config file for Kafka Broker with generated principal and key table file path.
kafka01 : “kafka_server_jaas.conf”
kafka02 : “kafka_server_jaas.conf”
3. Set JAAS file path in environment values.
Add following lines into “bin/zookeeper-server-start.sh“ file.
Add following lines into “bin/kafka-run-shell.sh“ and “bin/kafka-server-start.sh“ files.
4. Edit configuration files of Zookeeper to enable SASL_SSL feature
5. Edit configuration files of Kafka Broker to enable SASL_SSL feature.
kafka01 : “server-sasl_ssl.properties“
kafka02 : “server-sasl_ssl.properties“
6. Start Zookeeper and Kafka Broker services with new configuration files. You need to stop services before start if services are already running.
Running Fluentd Kafka plugin with SASL_SSL
1. Login to “mgmt01” and create principal and key table file for “fluent01“.
2. Copy generated key table file “kafka-client-fluent01.keytab“ on “fluent01“.
3. Install Kerberos client package on “fluent01”.
4. Edit Keberos configuration file like “krb5.conf”.
5. Make sure following packages are installed.
6. Update Fluentd configuration files with SASL_SSL parameters.
Note : It is recommended to use “rdkafka2“ plugin rather than “kafka2“ plugin . SASL_SSL might not work with “kafka2“ plugin due to issue in “ruby-kafka“.
7. You can confim if keytab file and principal work as expected with following sample Kafka applications.
Note : If you are using td-agent, you can run sample sample Kafka applications easily with “/opt/td-agent/bin/ruby {file name of sample Kafka application}“
Kafka Producer Sample for SASL_SSL : “rdkafka_producer_sasl_ssl.rb”
Kafka Consumer Sample for SASL_SSL : “rdkafka_consumer_sasl_ssl.rb”
8. Restart td-agent daemon.
9. Once td-agent is activated, you can confirm behavior of plugin by generating rsyslog events, restart “sshd” service for instance.
10. Now, you can subscribe message from topic with sample kafka Consumer application.
Congratulations! You finished the use case. Looking for more to work on? Try our Fluent Bit use case of using ‘tail’ plugin and parsing multiline log data.
Commercial Service - We are here for you.
In the Fluentd Subscription Network, we will provide you consultancy and professional services to help you run Fluentd and Fluent Bit with confidence by solving your pains. Service desk is also available for your operation and the team is equipped with the Diagtool and knowledge of tips running Fluentd in production. Contact us anytime if you would like to learn more about our service offerings.