img

Photo by Joao Tzanno on Unsplash

Fluent bit is one of the most used data shippers available. A data shipper is a tool that allows you to send (ship) data from various sources to a central location. The most well-known use case for data shipping is log aggregation: collecting logs lines from one or more files and sending them to a database, typically Elastic Search. Let’s have a very brief overview of how fluent bit works:

img

Source: fluentbit.io

The pipeline starts with the input, which can be files, syslog messages, MQTT and lots of other sources. Then you have the Parser phase where you convert raw, unstructured text into meaningful data. For example, a log line that looks like this:

192.168.2.20 - - [28/Jul/2006:10:27:10 -0300] "GET /cgi-bin/try/ HTTP/1.0" 200 3395

can be parsed to a JSON object like this:

{
  "host":    "192.168.2.20",
  "user":    "-",
  "method":  "GET",
  "path":    "/cgi-bin/try/",
  "code":    "200",
  "size":    "3395",
  "referer": "",
  "agent":   ""
 }

After being parsed, data is buffered somewhere before they get pushed to their final target. Fluent bit can use memory or the file system for buffering. Finally, the data is routed to where you want to ship it. You can forward the same piece of data to multiple destinations at the same time. For example, you can send logs to Elastic Search for mining and processing while the same data is also sent to an S3 bucket for long term archival in addition to a Kafka data broker for other parties to consume it.

This is a very powerful tool with lots of options. However, due to the large number of plugins that it supports, sometimes Fluent bit is hard to debug. Some plugins do not offer the same level of functionality when used in pipelines where some other plugins exist. Recently, I bumped into this issue. Due to the rarity of my use case, I didn’t find any online articles that explained how it could be accomplished so I decided to write my own. I hope it would somebody’s time one day.

Use Case: We need to select records from the same log file to get shipped to different destinations

So, we have a Kubernetes cluster that hosts a containerized application. The problem is that the timestamp format that this app uses in its logs is not consistent. In fact, it uses two formats depending on the event that happens. For example:

{"log":"[2020-10-10 11:29:00.868] [app] [info] This is a log line'\n","stream":"stderr","time":"2020-10-10T11:29:00.868096921Z"}
{"log":"[2020-10-10T12:18:50.070Z] [web] [info] \"GET / HTTP/1.1\" 404 NR 0 0 0 - \"10.88.2.128\" \"ELB-HealthChecker/2.0\" \"some data" \"10.88.2.128:8080\" \"-\"\n","stream":"stdout","time":"2020-10-10T12:18:54.700602212Z"}

Clearly, the first log line has a different timestamp format than the second one.

Why would this be a problem? because when you parse the log file, you need to define which field would be used as a Time_Key . The Time_Key is used afterwards in Elastic Search to perform time-based searches. If Fluent bit cannot parse this field, Elastic Search will substitute it with its own @timestamp field. This is still usable but not optimal; the @timestamp field records the time when the message hit Elastic Search not when the record was created in the logs. The difference could be as short as a few milliseconds or as more depending on how long it takes for the data to leave the node and arrive at Elastic Search. In our specific case, the timestamp was of absolute importance: we needed whatever was recorded in the original logs.

The solution? we need two regex parsers for the same file: one expecting the first timestamp format (call it app ) and the second recognizes the timestamp that’s formatted differently (call it web).

imgData pipeline with two parsers and one output

If you notice, we have only one output plugin since we are using one file (one Tag). Unfortunately, that won’t solve the issue. Shipping timestamp fields with different formats to an Elastic Search prevents it from automatically recognizing that field as a time one. There could be some possible tweaking on Elastic Search side to instruct it to read both formats and somehow unify them. But I decided to work on the fluent bit side. Let’s quickly review how fluent bit works in our specific example to understand how we can overcome the the above limitation.

The fluent bit tail plugin workflow

For working with log files, you use the tail INPUT plugin. Let’s see how it works start to end.

The tail plugin can collect logs from one or more files. Wildcards can be used. So, it might look as follows

[INPUT]
    Name              tail
    Tag               myapp.*
    Path              /var/log/containers/*myapp*.log
    DB                /var/log/myapp_flb_kube.db
    Buffer_Chunk_Size 5MB
    Buffer_Max_Size   5MB
    Mem_Buf_Limit     25MB
    Skip_Long_Lines   On
    Refresh_Interval  10

The instructions in this stanza can be translated to:

I want you to watch all the files under /var/log/containers that have the keyword myapp as part of their names, and that end in .log

The tail plugin starts reading each matching file line by line. Each line is sent to the pipeline as a message. The message must have a Tag so that it can be selected and routed afterwards to a designated target. The .* at the end of the Tag name has a special meaning: it instructs INPUT to append the name of the file (including the path) to the tag. So, if we have a file that looks like this:

/var/log/containers/myapp-6b97bd7766-p2sxl_mynamespace_myapp-d5b8eba228d442cb16a769a59e85ee81a891fdeb360ef0ca0cad52a5ee629ab1.log

All the messages (lines) that are extracted from that file are tagged as follows:

var.log.containers.myapp-6b97bd7766-p2sxl_mynamespace_myapp-d5b8eba228d442cb16a769a59e85ee81a891fdeb360ef0ca0cad52a5ee629ab1.log

But is that really necessary? Why not just tag the message as myapp and then we can select it for parsing and routing? The answer is that, since this is a Kubernetes environment, it’d be nice to enrich the message with some additional Kubernetes-related metadata like the pod name where this log lines came from, the namespace, the container name, etc. For that purpose, we use the kubernetes FILTER plugin, which needs those extra data in the tag as we’ll see.

The Fluent Bit kubernetes plugin

Consider the following configuration for the kubernetes plugin used in our example:

[FILTER]
    Name                kubernetes
    Match               myapp.*
    Kube_URL            https://kubernetes.default.svc:443
    Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
    Kube_Tag_Prefix     myapp.var.log.containers.
    Merge_Log           On
    Merge_Log_Key       log_processed
    K8S-Logging.Parser  On
    K8S-Logging.Exclude Off
    Labels              On
    Annotations         On

Notice that the Match attribute is configured to watch for messages that start with myapp. this allows it to match a message like myapp.var.log.containers.... which we saw in the previous step. Now, Kubernetes needs to extract useful information from this message. If you observe the message carefully, you’ll notice that it contains the pod name, the container name and the namespace since all of those are part of the file name. For Kuberntetes to be able to parse this information, it applies its own regex. However, the regex expects to find the file name ONLY. The prefix (myapp.var.log.containers ) needs to be removed. The Kube_Tag_Prefix does that trick: it removes this part from the message leaving it with the filename to be parsed. Note that this does not change the message Tag; it only formats the message in a way that the Kubernetes plugin can work on.

The message then moves to the parser(s) where data gets structured and organized into meaningful fields. The parser selects the message in which it is interested by its Tag. Finally, and also through the Tag, the OUTPUT plugin ships the message to the target.

So, I think by now you agree that the message Tag controls which path the message should follow. In our specific case, we wanted messages from the same file to have more than one Tag so that they can be shipped to two different destinations (two Elastic Search indexes in our case).

The Fluent Bit rewrite_tag plugin

This FILTER plugin did the trick. It works by searching for a specific match in the message through regex. When a match is found, it changes the message tag. So, for our case, we needed all the messages that had the [app] string to have the myapp.app tag, while those which have [web] should be tagged myapp.web . For this we needed two rewrite_tag filters with the following configuration:

[FILTER]
    Name                rewrite_tag
    Match               myapp.*
    Rule                $log ^.*\[app\].*$ myapp.app.var.log.containers.$TAG[4].log false
[FILTER]
    Name                rewrite_tag
    Match               myapp.*
    Rule                $log ^.*\[web\].*$ myapp.web.var.log.containers.$TAG[4].log false

So, the plugin handles any message that starts with myapp and applies the Rule on it. But the Rule does more than just renaming the Tag. Remember, we need the extra metadata that that message provides so that the kubernetes plugin can do its magic. The rewrite_plugin allows you to preserve data found in the original Tag by using the $TAG directive. To better understand it, let’s break up the rule directives one after another:

$log: The JSON key that contains the data that we need to search in. In our case, all lines have a format like {"log":"foo bar"} . When you select log as your key, the subject data is now foo bar without the quotes and the curl braces.

^.*\[app\].*$ : match any string that has the literal [app] inside it.

myapp.app.var.log.containers.$TAG[4].log : The new name. The $TAG[4] brings the filename part of from the original tag. The $TAG directive splits the original Tag by the dot . into an array. So, $TAG[0] becomes myapp , $TAG[1] is var and so on till we reach the file name which is $TAG[4].

false whether or not to preserve the message with the original tag.

Caveat: kube_tag_prefix does not work with the rewrite_plugin

This is the issue that I didn’t find documented anywhere. While the docs state that kubernetes plugin “depends on either Tail or Systemd plugins to work”, it does not mention how it should be configured with the rewrite_plugin.

So, we have now re-tagged our messages with our desired tags to route them in different paths. The kuberneres plugin is expecting a message that contains the required metadata that it would scrape. The problem is that the kube_tag_prefix will not work with re-tagged messages for some reason. So, the following will NOT work:

kube_tag_prefix   myapp.app.var.log.containers.

The only workaround is to define a new regex parser and use it with the kubernetes plugin. So, let’s create one:

In your parsers file, add the following:

[PARSER]
    Name    myapp-k8s-custom-tag
    Format  regex
    Regex   (?<tag>[^.]+)?\.?(?<pod_name>[a-z0-9](?:[-a-z0-9]*[a-z0-9])?(?:\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace_name>[^_]+)_(?<container_name>.+)-(?<docker_id>[a-z0-9]{64})\.log$

The regex extracts the tag prefix from the message and parses the metadata that the plugin needs. Now, you need to instruct kubernetes plugin to use this parser (we need two now, one for each message tag):

[FILTER]
    Name                kubernetes
    Match               myapp.web.*
    Kube_URL            https://kubernetes.default.svc:443
    Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
    Kube_Tag_Prefix     myapp.var.log.containers.
    Merge_Log           On
    Merge_Log_Key       log_processed
    K8S-Logging.Exclude On
    Labels              On
    Annotations         On
    K8S-Logging.Parser  Off
    Regex_Parser        myapp-k8s-custom-tag
[FILTER]
    Name                kubernetes
    Match               myapp.app.*
    Kube_URL            https://kubernetes.default.svc:443
    Kube_CA_File        /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    Kube_Token_File     /var/run/secrets/kubernetes.io/serviceaccount/token
    Kube_Tag_Prefix     myapp.var.log.containers.
    Merge_Log           On
    Merge_Log_Key       log_processed
    K8S-Logging.Exclude On
    Labels              On
    Annotations         On
    K8S-Logging.Parser  Off
    Regex_Parser        myapp-k8s-custom-tag

The complete configmap for the above setup is listed here for completeness: