Logstash消费Kafka的数据,然后输出到Elasticsearch,某一天发现使用Kibana查询不到最近的数据。查看log后发现Kafka持续输出如下log:
Preparing to rebalance group xxx with old generation 1178 (__consumer_offsets-2)(kafka.coordinator.group.GroupCoordinator)
Stabilized group xxx generation 1179 (__consumer_offsets-2) (kafka.coordinator.group.GroupCoordinator)
Assignment received from leader for group xxx for generation 1179 (kafka.coordinator.group.GroupCoordinator)
由此可以判断出Kafka在rebalance offset,Logstash并没有提交已消费的offset。进一步查看Elasticsearch储存的数据,有个index的数据量超出正常的十几倍,并且还在持续增长,从Kibana查询该index的数据,确实有大量重复。所以,Logstash在不断重复消费某个topic的数据。
那么原因是什么呢?
查找一系列资料后发现,Logstash消费一批数据时session timeout,导致offset没有提交给Kafka。解决方案是配置Logstash kafka input 的2个参数:max_poll_records 和 session_timeout_ms,增加 session timeout 或 降低 max poll records 或 二者都调整,让Logstash在允许的时间范围内把一批数据处理完。
网友评论