Configuration of fluent-plugin-concat makes the logs dissappear

  amazon-ecs, docker, fluent-docker, fluentd, logging

my configuration of ” fluent-plugin-concat” is causing my long logs to disappear instead of be concatenated and sent to Kinesis steam.
I use fluentd to send logs from containers deployed on AWS/ECS to a kinesis stream. ( and then to ES cluster somewhere)
on rare occasions, some of the logs are very big. most of the time they are under the docker limit of 16K. However, those rare long logs are very important and we don’t want to miss them.

My configuration file is attached.

Just before the final match sequence, I added:

<filter>
@type concat
key log
stream_identity_key container_id
partial_key partial_message
partial_value true
separator “”
</filter>

The log I’m testing with is also attached as a json document.
If I removed this configuration, this log will be sent in 2 chunks.
What am I doing wrong? (edited)

the full config file:

<system>
  log_level info
</system>

# just listen on the unix socket in a dir mounted from host
# input is a json object, with the actual log line in the `log` field
<source>
  @type unix
  path /var/fluentd/fluentd.sock
</source>

# tag log line as json or text
<match service.*.*>
  @type rewrite_tag_filter
  <rule>
    key log
    pattern /.*"logType":s*"application"/
    tag application.${tag}.json
  </rule>
  <rule>
    key log
    pattern /.*"logType":s*"exception"/
    tag exception.${tag}.json
  </rule>
  <rule>
    key log
    pattern /.*"logType":s*"audit"/
    tag audit.${tag}.json
  </rule>
  <rule>
    key log
    pattern /^{".*}$/
    tag default.${tag}.json
  </rule>
  <rule>
    key log
    pattern /.+/
    tag default.${tag}.txt
  </rule>
</match>

<filter *.service.*.*.*>
  @type record_transformer
  <record>
    service ${tag_parts[2]}
    childService ${tag_parts[3]}
  </record>
</filter>

<filter *.service.*.*.json>
  @type parser
  key_name log
  reserve_data true
  remove_key_name_field true
  <parse>
    @type json
  </parse>
</filter>

<filter *.service.*.*.*>
  @type record_transformer
  enable_ruby
  <record>
    @timestamp ${ require 'time'; Time.now.utc.iso8601(3) }
  </record>
</filter>


<filter>
  @type concat
  key log
  stream_identity_key container_id
  partial_key partial_message
  partial_value true
  separator ""
</filter>

<match exception.service.*.*.*>
  @type copy
  <store>
    @type kinesis_streams
    region "#{ENV['AWS_DEFAULT_REGION']}"
    stream_name the-name-ex
    debug false

    <instance_profile_credentials>
    </instance_profile_credentials>

    <buffer>
      flush_at_shutdown true
      flush_interval 10
      chunk_limit_size 16m
      flush_thread_interval 1.0
      flush_thread_burst_interval 1.0
      flush_thread_count 1
    </buffer>
  </store>

  <store>
    @type stdout
  </store>
 </match>

<match audit.service.*.*.json>
  @type copy

  <store>
    @type kinesis_streams
    region "#{ENV['AWS_DEFAULT_REGION']}"
    stream_name the-name-sa

    debug false

    <instance_profile_credentials>
    </instance_profile_credentials>

    <buffer>
      flush_at_shutdown true
      flush_interval 1
      chunk_limit_size 16m
      flush_thread_interval 0.1
      flush_thread_burst_interval 0.01
      flush_thread_count 15
    </buffer>
  </store>


  <store>
    @type stdout
  </store>

</match>

<match *.service.*.*.*>
  @type copy
  <store>
    @type kinesis_streams
    region "#{ENV['AWS_DEFAULT_REGION']}"
    stream_name  the-name-apl
    debug false

    <instance_profile_credentials>
    </instance_profile_credentials>

    <buffer>
      flush_at_shutdown true
      flush_interval 10
      chunk_limit_size 16m
      flush_thread_interval 1.0
      flush_thread_burst_interval 1.0
      flush_thread_count 1
    </buffer>
  </store>

  <store>
    @type stdout
  </store>
 </match>



 <match **>
  @type stdout
 </match>

example log message – long single line:

{"message": "some message", "longlogtest": "averylongjsonline", "service": "longlog-service", "logType": "application", "log": "aa .... ( ~18000 chars )..longlogThisIsTheEndOfTheLongLog"}

Source: Docker Questions

LEAVE A COMMENT