Practical Tips for Data Volume Reduction with Fluentd

Introduction

Observability has become a crucial aspect of managing complex systems. It empowers organizations to gain valuable insights into their applications, infrastructure, and user experiences. However, as businesses grow and data volumes increase, managing the costs associated with observability tools and infrastructure becomes a key challenge. In this blog post, we'll explore approaches to leverage the advantages of observability while keeping a close eye on running costs with Fluentd.

Approaches

1. Discard a log event that is non-essential

The most simple way to reduce the data is discarding unnecessary events. Systems and applications emit high granular data for in-depth analysis and troubleshooting. It sometimes includes specific patterns which periodically appear and are noisy for the backend systems. By omitting these, you can not only reduce the amount of data but also prevent important data from being buried in noise.

2. Categorizing log events and routing to the proper backend

In today's dynamic and data-driven landscape, a single log event may be consumed by multiple backends. The most popular choice is log management platforms like Splunk, OpenSearch. An object storage such as S3 is used for archive purposes. On the other hand, the streaming platforms, such as Kafka, are also widely used as a destination of Fluentd since they give a great flexibility in data consumption. For archive purposes, the whole log events should be kept. However, applications, such as trend analytics, do not alway require a whole data set. In this case, categorizing log events by contents and routing log events to the backend is beneficial to take control of the data pipeline.

3. Optimizing message size by cutting down unnecessary content

A log event includes a variety of information, such as time, log_level, category, etc. In certain situations, some of the information is not essential or duplicated. Here is a sample log message from Windows Event Log. As you can see, "time_generated" and "time_written" are duplicated. In this case, you can downsize the log message almost 12% easily by removing the "time_written" field.

{
        "channel":"system",
        "record_number":"40432",
        "time_generated":"2020-03-07 09:15:39 +0000",
        "time_written":"2020-03-07 09:15:39 +0000",
        "event_id":"7036",
        "event_type":"information",
        "event_category":"0",
        "source_name":"Service Control Manager",
        "computer_name":"WIN-7IMHK7EQ5T3",
        "user":"",
        "description":"The Windows Installer service entered the stopped state.\r\n"
}

Taking control of the amount of data with Fluentd

In this section, we’ll show you how to configure Fluentd for the data volume reduction with some use cases.

1. Discard a log event that is non-essential

Fluentd has a variety of filter plugins which allow you to process data before shipping it to destinations. The grep filter plugin in Fluentd is used to filter and select specific log events that match a given regular expression pattern. It allows users to keep only the log events that are needed. We’ll explain how the grep filter works with the following sample log file.

2023-07-26 18:25:29 +0000 [info]: #0 starting fluentd worker pid=947900 ppid=947895 worker=0
2023-07-26 18:25:29 +0000 [debug]: #0 buffer started instance=2000 stage_size=0 queue_size=0
2023-07-26 18:25:29 +0000 [debug]: #0 flush_thread actually running
2023-07-26 18:25:29 +0000 [debug]: #0 enqueue_thread actually running
2023-07-26 18:25:29 +0000 [info]: #0 following tail of /var/log/nginx/access.log
2023-07-26 18:25:29 +0000 [info]: #0 fluentd worker is now running worker=0
2023-07-26 18:25:33 +0000 [debug]: #0 Created new chunk chunk_id="60167f91ec7fda76836c05bf4d09ab88" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables=nil, seq=0>
2023-07-26 18:25:38 +0000 [debug]: #0 [Sending] Chunk: 60167f91ec7fda76836c05bf4d09ab88(843B).
2023-07-26 18:25:38 +0000 [debug]: #0 [Response] Chunk: 60167f91ec7fda76836c05bf4d09ab88 Size: 843 Response: #<Net::HTTPOK 200 OK readbody=true> Duration: 0.020717207
2023-07-26 18:25:48 +0000 [debug]: #0 Created new chunk chunk_id="60167fa0dc37ea826882b7da60623711" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables=nil, seq=0>
2023-07-26 18:25:53 +0000 [debug]: #0 [Sending] Chunk: 60167fa0dc37ea826882b7da60623711(1686B).
2023-07-26 18:25:53 +0000 [debug]: #0 [Response] Chunk: 60167fa0dc37ea826882b7da60623711 Size: 1686 Response: #<Net::HTTPOK 200 OK readbody=true> Duration: 0.009840038
2023-07-26 18:26:16 +0000 [error]: Worker 0 exited unexpectedly with signal SIGKILL
2023-07-26 18:26:17 +0000 [info]: #0 init worker0 logger path=nil rotate_age=nil rotate_size=nil
2023-07-26 18:26:17 +0000 [info]: adding match in @splunk pattern="nginx.access" type="splunk_hec"
2023-07-26 18:26:17 +0000 [info]: adding match in @kafka pattern="nginx.access" type="stdout"
2023-07-26 18:26:17 +0000 [info]: adding match pattern="nginx.access" type="copy"
2023-07-26 18:26:17 +0000 [info]: adding match pattern="nginx.access" type="stdout"
2023-07-26 18:26:17 +0000 [info]: adding source type="tail"
2023-07-26 18:26:17 +0000 [warn]: #0 'pos_file PATH' parameter is not set to a 'tail' source.
2023-07-26 18:26:17 +0000 [warn]: #0 this parameter is highly recommended to save the position to resume tailing.
2023-07-26 18:26:17 +0000 [info]: #0 starting fluentd worker pid=947952 ppid=947895 worker=0
2023-07-26 18:26:17 +0000 [debug]: #0 buffer started instance=2000 stage_size=0 queue_size=0
2023-07-26 18:26:17 +0000 [debug]: #0 flush_thread actually running
2023-07-26 18:26:17 +0000 [debug]: #0 enqueue_thread actually running
2023-07-26 18:26:17 +0000 [info]: #0 following tail of /var/log/nginx/access.log
2023-07-26 18:26:17 +0000 [info]: #0 fluentd worker is now running worker=0
2023-07-26 18:26:23 +0000 [debug]: #0 Created new chunk chunk_id="60167fc1a209dc4a373c02572b3de4e0" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables=nil, seq=0>

As you can see, each log event in the sample has a structure.

2023-07-26 18:25:29 +0000 [debug]: #0 buffer started instance=2000 stage_size=0 queue_size=0
—
<timestamp> [<log level>]: <message>

We need to parse the raw text data into key-value format before applying filters. Here is the sample Fluentd configuration which extracts <timestamp>, <log level> and <message> from the raw message.

### fluentd_test01.conf 
<source>
  @type tail
  path ./sample01.log
  read_from_head true
  #db ./test.db
  #path_key tailed_path
  tag sample01
  <parse>
    @type none
  </parse>
</source>
<filter sample01>
  @type parser
  key_name message
  reserve_data true
  reserve_time true
  <parse>
    @type regexp
    expression /^(?<time>.*)\s+\[(?<level>.*)\]:\s+(?<message>.*)$/
    time_format %Y-%m-%d %H:%M:%S %z  ### 2023-07-26 18:26:23 +0000
    keep_time_key true
  </parse>
</filter>

<match sample01>
  @type stdout
</match>

By manually running Fluentd, you can check if the output looks as expected.

$ td-agent -c fluentd_test01.conf

The raw text message is parsed into key-value pairs like the following:

{
        "message":"#0 starting fluentd worker pid=947900 ppid=947895 worker=0",
        "time":"2023-07-26 18:25:29 +0000",
        "level":"info"
}

Now we got prepared for the data volume reduction with filter plugins. The example fluentd_test02.conf is simply discarding the messages tagged as a “debug” log level with the grep filter.

### fluentd_test02.conf
<source>
  @type tail
  path ./sample01.log
  read_from_head true
  #db ./test.db
  #path_key tailed_path
  tag sample01
  <parse>
    @type none
  </parse>
</source>

<filter sample01>
  @type parser
  key_name message
  reserve_data true
  reserve_time true
  <parse>
    @type regexp
    expression /^(?<time>.*)\s+\[(?<level>.*)\]:\s+(?<message>.*)$/
    time_format %Y-%m-%d %H:%M:%S %z  ### 2023-07-26 18:26:23 +0000
    keep_time_key true
  </parse>
</filter>

<filter sample01>
  @type grep
  <exclude>
    key level
    pattern /^(?:debug)$/
  </exclude>
</filter>

<match sample01>
  @type stdout
</match>

With the above configuration, the messages whose log level is “debug” are not shown anymore.

{"message":"#0 starting fluentd worker pid=947900 ppid=947895 worker=0","time":"2023-07-26 18:25:29 +0000","level":"info"}
{"message":"#0 following tail of /var/log/nginx/access.log","time":"2023-07-26 18:25:29 +0000","level":"info"}
{"message":"#0 fluentd worker is now running worker=0","time":"2023-07-26 18:25:29 +0000","level":"info"}
{"message":"Worker 0 exited unexpectedly with signal SIGKILL","time":"2023-07-26 18:26:16 +0000","level":"error"}
{"message":"#0 init worker0 logger path=nil rotate_age=nil rotate_size=nil","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"adding match in @splunk pattern=\"nginx.access\" type=\"splunk_hec\"","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"adding match in @kafka pattern=\"nginx.access\" type=\"stdout\"","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"adding match pattern=\"nginx.access\" type=\"copy\"","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"adding match pattern=\"nginx.access\" type=\"stdout\"","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"adding source type=\"tail\"","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"#0 'pos_file PATH' parameter is not set to a 'tail' source.","time":"2023-07-26 18:26:17 +0000","level":"warn"}
{"message":"#0 this parameter is highly recommended to save the position to resume tailing.","time":"2023-07-26 18:26:17 +0000","level":"warn"}
{"message":"#0 starting fluentd worker pid=947952 ppid=947895 worker=0","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"#0 following tail of /var/log/nginx/access.log","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"#0 fluentd worker is now running worker=0","time":"2023-07-26 18:26:17 +0000","level":"info"}

If you need additional conditions, the <and> operator is useful. In fluentd_test03.conf, we exclude repeating log messages from “debug” log level.

### fluentd_test03.conf
<source>
  @type tail
  path ./sample01.log
  read_from_head true
  #db ./test.db
  #path_key tailed_path
  tag sample01
  <parse>
    @type none
  </parse>
</source>

<filter sample01>
  @type parser
  key_name message
  reserve_data true
  reserve_time true
  <parse>
    @type regexp
    expression /^(?<time>.*)\s+\[(?<level>.*)\]:\s+(?<message>.*)$/
    time_format %Y-%m-%d %H:%M:%S %z  ### 2023-07-26 18:26:23 +0000
    keep_time_key true
  </parse>
</filter>

<filter sample01>
  @type grep
  <and>
    <exclude>
      key level
      pattern /^(?:debug)$/
    </exclude>
    <exclude>
      key messages
      pattern /flush_thread actually running|enqueue_thread actually running/
    </exclude>
  </and>
</filter>

<match sample01>
  @type stdout
</match>

As you can see from the output, repeating dubug messages are excluded.

{"message":"#0 starting fluentd worker pid=947900 ppid=947895 worker=0","time":"2023-07-26 18:25:29 +0000","level":"info"}
{"message":"#0 buffer started instance=2000 stage_size=0 queue_size=0","time":"2023-07-26 18:25:29 +0000","level":"debug"}
{"message":"#0 flush_thread actually running","time":"2023-07-26 18:25:29 +0000","level":"debug"}
{"message":"#0 enqueue_thread actually running","time":"2023-07-26 18:25:29 +0000","level":"debug"}
{"message":"#0 following tail of /var/log/nginx/access.log","time":"2023-07-26 18:25:29 +0000","level":"info"}
{"message":"#0 fluentd worker is now running worker=0","time":"2023-07-26 18:25:29 +0000","level":"info"}
{"message":"#0 Created new chunk chunk_id=\"60167f91ec7fda76836c05bf4d09ab88\" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables=nil, seq=0>","time":"2023-07-26 18:25:33 +0000","level":"debug"}
{"message":"#0 [Sending] Chunk: 60167f91ec7fda76836c05bf4d09ab88(843B).","time":"2023-07-26 18:25:38 +0000","level":"debug"}
{"message":"#0 [Response] Chunk: 60167f91ec7fda76836c05bf4d09ab88 Size: 843 Response: #<Net::HTTPOK 200 OK readbody=true> Duration: 0.020717207","time":"2023-07-26 18:25:38 +0000","level":"debug"}
{"message":"#0 Created new chunk chunk_id=\"60167fa0dc37ea826882b7da60623711\" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables=nil, seq=0>","time":"2023-07-26 18:25:48 +0000","level":"debug"}
{"message":"#0 [Sending] Chunk: 60167fa0dc37ea826882b7da60623711(1686B).","time":"2023-07-26 18:25:53 +0000","level":"debug"}
{"message":"#0 [Response] Chunk: 60167fa0dc37ea826882b7da60623711 Size: 1686 Response: #<Net::HTTPOK 200 OK readbody=true> Duration: 0.009840038","time":"2023-07-26 18:25:53 +0000","level":"debug"}
{"message":"Worker 0 exited unexpectedly with signal SIGKILL","time":"2023-07-26 18:26:16 +0000","level":"error"}
{"message":"#0 init worker0 logger path=nil rotate_age=nil rotate_size=nil","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"adding match in @splunk pattern=\"nginx.access\" type=\"splunk_hec\"","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"adding match in @kafka pattern=\"nginx.access\" type=\"stdout\"","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"adding match pattern=\"nginx.access\" type=\"copy\"","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"adding match pattern=\"nginx.access\" type=\"stdout\"","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"adding source type=\"tail\"","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"#0 'pos_file PATH' parameter is not set to a 'tail' source.","time":"2023-07-26 18:26:17 +0000","level":"warn"}<br>
{"message":"#0 this parameter is highly recommended to save the position to resume tailing.","time":"2023-07-26 18:26:17 +0000","level":"warn"}
{"message":"#0 starting fluentd worker pid=947952 ppid=947895 worker=0","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"#0 buffer started instance=2000 stage_size=0 queue_size=0","time":"2023-07-26 18:26:17 +0000","level":"debug"}
{"message":"#0 flush_thread actually running","time":"2023-07-26 18:26:17 +0000","level":"debug"}
{"message":"#0 enqueue_thread actually running","time":"2023-07-26 18:26:17 +0000","level":"debug"}
{"message":"#0 following tail of /var/log/nginx/access.log","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"#0 fluentd worker is now running worker=0","time":"2023-07-26 18:26:17 +0000","level":"info"}
{"message":"#0 Created new chunk chunk_id=\"60167fc1a209dc4a373c02572b3de4e0\" metadata=#<struct Fluent::Plugin::Buffer::Metadata timekey=nil, tag=nil, variables=nil, seq=0>","time":"2023-07-26 18:26:23 +0000","level":"debug"}

2. Categorizing log events and routing to the proper backend

Fluentd has a tag feature which acts as identifiers for log events, enabling Fluentd to route and filter logs efficiently. It allows you to specify different destinations for different log types or sources, simplifying log management and reducing processing overhead. The rewrite_tag_filter is one of the most widely used plugins that provides a rule-based mechanism for rewriting the original tag. In the following sample, we’ll demonstrate how to rewrite the tag with custom rules and routing events to the destination.

### sample02.log<br>
10.0.1.23 - - [24/Jul/2023:23:38:48 +0000] "GET /test HTTP/1.1" 404 3971 "-" "curl/7.61.1" "-"
10.0.1.23 - - [24/Jul/2023:23:38:50 +0000] "GET /test HTTP/1.1" 404 3971 "-" "curl/7.61.1" "-"
10.0.1.23 - - [24/Jul/2023:23:38:51 +0000] "GET /test HTTP/1.1" 404 3971 "-" "curl/7.61.1" "-"
10.0.1.23 - - [24/Jul/2023:23:38:55 +0000] "GET / HTTP/1.1" 200 4057 "-" "curl/7.61.1" "-"
10.0.1.23 - - [24/Jul/2023:23:38:56 +0000] "GET / HTTP/1.1" 200 4057 "-" "curl/7.61.1" "-"
10.0.1.23 - - [24/Jul/2023:23:38:57 +0000] "GET / HTTP/1.1" 200 4057 "-" "curl/7.61.1" "-"

Here is the sample configuration that rewrites tags by HTTP return codes and routes log events to different Splunk indexes based on new tags.

### fluentd_test04.conf
​​<source>
  @type tail
  path sample02.log
  tag sample02
  <parse>
    @type nginx
  </parse>
</source>

<match sample02>
  @type rewrite_tag_filter
  capitalize_regex_backreference yes
  <rule>
    key     code
    pattern /^2\d\d$/
    tag     ${tag}.success
  </rule>
  <rule>
    key     code
    pattern /^[3|4|5]\d\d$/
    tag     ${tag}.fail
  </rule>
</match>

<match sample02.success>
  @type splunk_hec
  @log_level debug
  hec_host splunk01.demo.local
  hec_port 8088
  hec_token  <please use your hec token>
  index nginx-access-success
  insecure_ssl true
  <format>
    @type json
  </format>
  <buffer>
    @type file
    path /var/log/td-agent/buffer/
    flush_interval 5s
  </buffer>
</match>

<match sample02.fail>
  @type splunk_hec
  @log_level debug
  hec_host splunk01.demo.local
  hec_port 8088
  hec_token  <please use your hec token>
  index nginx-access-fail
  insecure_ssl true
  <format>
    @type json
  </format>
  <buffer>
    @type file
    path /var/log/td-agent/buffer/
    flush_interval 5s
  </buffer>
</match>

3. Optimizing message size by cutting down unnecessary content

The record_transformer filter is also a popular plugin in the community that allows users to modify, enhance, or manipulate the individual log records passing through Fluentd in a flexible and customizable way. The ‘record_transformer’ has the remove_keys option which is simple but powerful to reduce the amount of data by removing unnecessary fields. In fluentd_test05.conf, we use a sample Windows Event Log as a dummy input and then record_transformer removes duplicated time information described in time_written field.

### fluentd_test05.conf
<source>
  @type dummy
  dummy {"channel":"system","record_number":"40432","time_generated":"2020-03-07 09:15:39 +0000","time_written":"2020-03-07 09:15:39 +0000","event_id":"7036","event_type":"information","event_category":"0","source_name":"Service Control Manager","computer_name":"WIN-7IMHK7EQ5T3","user":"","description":"The Windows Installer service entered the stopped state.\r\n"}
  tag sample03
</source>

<filter sample03>
  @type record_transformer
  remove_keys time_written
</filter>

<match sample03>
  @type stdout
</match>

Conclusion

Observability is essential in the modern system for gaining valuable insights, but managing the associated running costs is equally critical for long-term operation. Fluentd has a variety of plugins which allow users to not only enrich, transform and process data but also reduce the amount of data volume.


Need some help? - We are here for you.

In the Fluentd Subscription Network, we will provide you consultancy and professional services to help you run Fluentd and Fluent Bit with confidence by solving your pains. Service desk is also available for your operation and the team is equipped with the Diagtool and the knowledge of tips running Fluent Bit/Fluentd in production. Contact us anytime if you would like to learn more about our service offerings!

Previous
Previous

A new Fluentd package was released

Next
Next

Parsing Logs with Regular Expressions using Fluentd