目录

fluentd write to kafka hang

日志截断

kafka message 默认有 1M 大小的限制(可以调整),在收集日志时,太长的日志需要进行截断处理,配置如下:

1
2
3
4
5
6
7
<filter test-log.**>
  @type record_transformer
  enable_ruby true
  <record>
    log      ${ record.dig("log")[0, 999000] }
  </record>
</filter>

这里通过 record.dig 将 log 字段截断为 999k。详细配置见 record_transformer

日志写入到 kafka 卡住

采集的日志大于 1M 的时候,写入到 kafka 报错,一段时间后 fluentd 卡住,日志如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
2020-03-26 08:46:12 +0000 [warn]: [kafka-logstash] failed to flush the buffer. retry_time=11 next_retry_seconds=2020-03-26 09:02:59 +0000 chunk="5a1bdc1a55a3512eaef615952ce35fe0" error_class=Kafka::MessageSizeTooLarge error="Kafka::MessageSizeTooLarge"
  2020-03-26 08:46:12 +0000 [warn]: suppressed same stacktrace
2020-03-26 09:03:03 +0000 [warn]: [kafka-logstash] Send exception occurred: Kafka::MessageSizeTooLarge
2020-03-26 09:03:03 +0000 [warn]: [kafka-logstash] Exception Backtrace : /var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/protocol.rb:160:in `handle_error'
/var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:153:in `block in handle_response'
/var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/protocol/produce_response.rb:36:in `block (2 levels) in each_partition'
/var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/protocol/produce_response.rb:35:in `each'
/var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/protocol/produce_response.rb:35:in `block in each_partition'
/var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/protocol/produce_response.rb:34:in `each'
/var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/protocol/produce_response.rb:34:in `each_partition'
/var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:144:in `handle_response'
/var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:133:in `block in send_buffered_messages'
/var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:105:in `each'
/var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:105:in `send_buffered_messages'
/var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:62:in `block in execute'
/var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/instrumenter.rb:23:in `instrument'
/var/lib/gems/2.5.0/gems/ruby-kafka-0.7.10/lib/kafka/produce_operation.rb:53:in `execute'
/var/lib/gems/2.5.0/gems/fluent-plugin-kafka-0.11.0/lib/fluent/plugin/kafka_producer_ext.rb:210:in `block in deliver_messages_with_retries'
/var/lib/gems/2.5.0/gems/fluent-plugin-kafka-0.11.0/lib/fluent/plugin/kafka_producer_ext.rb:200:in `loop'
/var/lib/gems/2.5.0/gems/fluent-plugin-kafka-0.11.0/lib/fluent/plugin/kafka_producer_ext.rb:200:in `deliver_messages_with_retries'
/var/lib/gems/2.5.0/gems/fluent-plugin-kafka-0.11.0/lib/fluent/plugin/kafka_producer_ext.rb:126:in `deliver_messages'
/var/lib/gems/2.5.0/gems/fluent-plugin-kafka-0.11.0/lib/fluent/plugin/out_kafka2.rb:259:in `write'
/var/lib/gems/2.5.0/gems/fluentd-1.8.0/lib/fluent/plugin/output.rb:1133:in `try_flush'
/var/lib/gems/2.5.0/gems/fluentd-1.8.0/lib/fluent/plugin/output.rb:1439:in `flush_thread_run'
/var/lib/gems/2.5.0/gems/fluentd-1.8.0/lib/fluent/plugin/output.rb:461:in `block (2 levels) in start'
/var/lib/gems/2.5.0/gems/fluentd-1.8.0/lib/fluent/plugin_helper/thread.rb:78:in `block in thread_create'

添加 kafka 输出插件的参数 max_send_retries 2,重启后依然会不停的 retry ,然后卡住。

其实这里的 max_send_retries 是起作用了的。日志里看到的不停重启是输出插件的 buffer 里配置的重试策略。下面整体捋一下:

采集的日志大于 1M 导致 kafka 输出插件报错 MessageSizeTooLarge -> 按照 kafka 输出插件配置的参数重试 -> 重试 max_send_retries 后依然报错 -> 导致 kafka buffer flush 失败 -> 触发了 buffer flush 失败的重试机制二次回退,默认超时时间 72h

解决方案:

  1. 修改 buffer flush 失败的重试参数,retry_max_times 3,这样重试一定次数后记录失败的日志然后就接着执行了。具体参数见 retries-parameters
  2. 使用上面讲到的方法,把日志截断到合适的大小
  3. 配置 kafka 输出插件的 ignore_exceptions 参数 [“Kafka::MessageSizeTooLarge”],这样日志太长的错误会被忽略,不会 raise 到上层(该参数在文档上没找到,翻代码看到的)