Photo by Joao Tzanno on Unsplash
Fluent bit is one of the most used data shippers available. A data shipper is a tool that allows you to send (ship) data from various sources to a central location. The most well-known use case for data shipping is log aggregation: collecting logs lines from one or more files and sending them to a database, typically Elastic Search. Let’s have a very brief overview of how fluent bit works:
Source: fluentbit.io
The pipeline starts with the input, which can be files, syslog messages, MQTT and lots of other sources. Then you have the Parser phase where you convert raw, unstructured text into meaningful data. For example, a log line that looks like this:
192.168.2.20 - - [28/Jul/2006:10:27:10 -0300] "GET /cgi-bin/try/ HTTP/1.0" 200 3395
can be parsed to a JSON object like this:
{
"host": "192.168.2.20",
"user": "-",
"method": "GET",
"path": "/cgi-bin/try/",
"code": "200",
"size": "3395",
"referer": "",
"agent": ""
}
After being parsed, data is buffered somewhere before they get pushed to their final target. Fluent bit can use memory or the file system for buffering. Finally, the data is routed to where you want to ship it. You can forward the same piece of data to multiple destinations at the same time. For example, you can send logs to Elastic Search for mining and processing while the same data is also sent to an S3 bucket for long term archival in addition to a Kafka data broker for other parties to consume it.
This is a very powerful tool with lots of options. However, due to the large number of plugins that it supports, sometimes Fluent bit is hard to debug. Some plugins do not offer the same level of functionality when used in pipelines where some other plugins exist. Recently, I bumped into this issue. Due to the rarity of my use case, I didn’t find any online articles that explained how it could be accomplished so I decided to write my own. I hope it would somebody’s time one day.
Use Case: We need to select records from the same log file to get shipped to different destinations
So, we have a Kubernetes cluster that hosts a containerized application. The problem is that the timestamp format that this app uses in its logs is not consistent. In fact, it uses two formats depending on the event that happens. For example:
{"log":"[2020-10-10 11:29:00.868] [app] [info] This is a log line'\n","stream":"stderr","time":"2020-10-10T11:29:00.868096921Z"}
{"log":"[2020-10-10T12:18:50.070Z] [web] [info] \"GET / HTTP/1.1\" 404 NR 0 0 0 - \"10.88.2.128\" \"ELB-HealthChecker/2.0\" \"some data" \"10.88.2.128:8080\" \"-\"\n","stream":"stdout","time":"2020-10-10T12:18:54.700602212Z"}
Clearly, the first log line has a different timestamp format than the second one.
Why would this be a problem? because when you parse the log file, you need to define which field would be used as a Time_Key
. The Time_Key
is used afterwards in Elastic Search to perform time-based searches. If Fluent bit cannot parse this field, Elastic Search will substitute it with its own @timestamp
field. This is still usable but not optimal; the @timestamp
field records the time when the message hit Elastic Search not when the record was created in the logs. The difference could be as short as a few milliseconds or as more depending on how long it takes for the data to leave the node and arrive at Elastic Search. In our specific case, the timestamp was of absolute importance: we needed whatever was recorded in the original logs.
The solution? we need two regex parsers for the same file: one expecting the first timestamp format (call it app
) and the second recognizes the timestamp that’s formatted differently (call it web
).
Data pipeline with two parsers and one output
If you notice, we have only one output plugin since we are using one file (one Tag). Unfortunately, that won’t solve the issue. Shipping timestamp fields with different formats to an Elastic Search prevents it from automatically recognizing that field as a time one. There could be some possible tweaking on Elastic Search side to instruct it to read both formats and somehow unify them. But I decided to work on the fluent bit side. Let’s quickly review how fluent bit works in our specific example to understand how we can overcome the the above limitation.
The fluent bit tail plugin workflow
For working with log files, you use the tail
INPUT plugin. Let’s see how it works start to end.
The tail
plugin can collect logs from one or more files. Wildcards can be used. So, it might look as follows
[INPUT]
Name tail
Tag myapp.*
Path /var/log/containers/*myapp*.log
DB /var/log/myapp_flb_kube.db
Buffer_Chunk_Size 5MB
Buffer_Max_Size 5MB
Mem_Buf_Limit 25MB
Skip_Long_Lines On
Refresh_Interval 10
The instructions in this stanza can be translated to:
I want you to watch all the files under
/var/log/containers
that have the keywordmyapp
as part of their names, and that end in.log
The tail
plugin starts reading each matching file line by line. Each line is sent to the pipeline as a message. The message must have a Tag so that it can be selected and routed afterwards to a designated target. The .*
at the end of the Tag name has a special meaning: it instructs INPUT to append the name of the file (including the path) to the tag. So, if we have a file that looks like this:
/var/log/containers/myapp-6b97bd7766-p2sxl_mynamespace_myapp-d5b8eba228d442cb16a769a59e85ee81a891fdeb360ef0ca0cad52a5ee629ab1.log
All the messages (lines) that are extracted from that file are tagged as follows:
var.log.containers.myapp-6b97bd7766-p2sxl_mynamespace_myapp-d5b8eba228d442cb16a769a59e85ee81a891fdeb360ef0ca0cad52a5ee629ab1.log
But is that really necessary? Why not just tag the message as myapp
and then we can select it for parsing and routing? The answer is that, since this is a Kubernetes environment, it’d be nice to enrich the message with some additional Kubernetes-related metadata like the pod name where this log lines came from, the namespace, the container name, etc. For that purpose, we use the kubernetes
FILTER plugin, which needs those extra data in the tag as we’ll see.
The Fluent Bit kubernetes plugin
Consider the following configuration for the kubernetes
plugin used in our example:
[FILTER]
Name kubernetes
Match myapp.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Kube_Tag_Prefix myapp.var.log.containers.
Merge_Log On
Merge_Log_Key log_processed
K8S-Logging.Parser On
K8S-Logging.Exclude Off
Labels On
Annotations On
Notice that the Match
attribute is configured to watch for messages that start with myapp.
this allows it to match a message like myapp.var.log.containers....
which we saw in the previous step. Now, Kubernetes needs to extract useful information from this message. If you observe the message carefully, you’ll notice that it contains the pod name, the container name and the namespace since all of those are part of the file name. For Kuberntetes to be able to parse this information, it applies its own regex. However, the regex expects to find the file name ONLY. The prefix (myapp.var.log.containers
) needs to be removed. The Kube_Tag_Prefix
does that trick: it removes this part from the message leaving it with the filename to be parsed. Note that this does not change the message Tag; it only formats the message in a way that the Kubernetes plugin can work on.
The message then moves to the parser(s) where data gets structured and organized into meaningful fields. The parser selects the message in which it is interested by its Tag. Finally, and also through the Tag, the OUTPUT plugin ships the message to the target.
So, I think by now you agree that the message Tag controls which path the message should follow. In our specific case, we wanted messages from the same file to have more than one Tag so that they can be shipped to two different destinations (two Elastic Search indexes in our case).
The Fluent Bit rewrite_tag plugin
This FILTER plugin did the trick. It works by searching for a specific match in the message through regex. When a match is found, it changes the message tag. So, for our case, we needed all the messages that had the [app]
string to have the myapp.app
tag, while those which have [web]
should be tagged myapp.web
. For this we needed two rewrite_tag filters with the following configuration:
[FILTER]
Name rewrite_tag
Match myapp.*
Rule $log ^.*\[app\].*$ myapp.app.var.log.containers.$TAG[4].log false
[FILTER]
Name rewrite_tag
Match myapp.*
Rule $log ^.*\[web\].*$ myapp.web.var.log.containers.$TAG[4].log false
So, the plugin handles any message that starts with myapp
and applies the Rule
on it. But the Rule does more than just renaming the Tag. Remember, we need the extra metadata that that message provides so that the kubernetes
plugin can do its magic. The rewrite_plugin
allows you to preserve data found in the original Tag by using the $TAG
directive. To better understand it, let’s break up the rule directives one after another:
$log
: The JSON key that contains the data that we need to search in. In our case, all lines have a format like {"log":"foo bar"}
. When you select log
as your key, the subject data is now foo bar
without the quotes and the curl braces.
^.*\[app\].*$
: match any string that has the literal [app]
inside it.
myapp.app.var.log.containers.$TAG[4].log
: The new name. The $TAG[4]
brings the filename part of from the original tag. The $TAG
directive splits the original Tag by the dot .
into an array. So, $TAG[0]
becomes myapp
, $TAG[1]
is var
and so on till we reach the file name which is $TAG[4]
.
false
whether or not to preserve the message with the original tag.
Caveat: kube_tag_prefix does not work with the rewrite_plugin
This is the issue that I didn’t find documented anywhere. While the docs state that kubernetes
plugin “depends on either Tail
or Systemd
plugins to work”, it does not mention how it should be configured with the rewrite_plugin
.
So, we have now re-tagged our messages with our desired tags to route them in different paths. The kuberneres
plugin is expecting a message that contains the required metadata that it would scrape. The problem is that the kube_tag_prefix
will not work with re-tagged messages for some reason. So, the following will NOT work:
kube_tag_prefix myapp.app.var.log.containers.
The only workaround is to define a new regex parser and use it with the kubernetes
plugin. So, let’s create one:
In your parsers file, add the following:
[PARSER]
Name myapp-k8s-custom-tag
Format regex
Regex (?<tag>[^.]+)?\.?(?<pod_name>[a-z0-9](?:[-a-z0-9]*[a-z0-9])?(?:\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_(?<namespace_name>[^_]+)_(?<container_name>.+)-(?<docker_id>[a-z0-9]{64})\.log$
The regex
extracts the tag prefix from the message and parses the metadata that the plugin needs. Now, you need to instruct kubernetes
plugin to use this parser (we need two now, one for each message tag):
[FILTER]
Name kubernetes
Match myapp.web.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Kube_Tag_Prefix myapp.var.log.containers.
Merge_Log On
Merge_Log_Key log_processed
K8S-Logging.Exclude On
Labels On
Annotations On
K8S-Logging.Parser Off
Regex_Parser myapp-k8s-custom-tag
[FILTER]
Name kubernetes
Match myapp.app.*
Kube_URL https://kubernetes.default.svc:443
Kube_CA_File /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
Kube_Token_File /var/run/secrets/kubernetes.io/serviceaccount/token
Kube_Tag_Prefix myapp.var.log.containers.
Merge_Log On
Merge_Log_Key log_processed
K8S-Logging.Exclude On
Labels On
Annotations On
K8S-Logging.Parser Off
Regex_Parser myapp-k8s-custom-tag
The complete configmap for the above setup is listed here for completeness: