美文网首页分布式&高可用分布式
logstash 重复消费kafka问题

logstash 重复消费kafka问题

作者: YG_9013 | 来源:发表于2018-07-29 17:47 被阅读185次

前两天业务方突然找到我说当天索引ES查询很慢,原来毫秒级的查询现在竟然要20s,让我处理下。我看了下索引大小,原来是1分片6g左右,今天突然就变成了1分片32g。然后我就一脸硬气的告诉他,你们业务膨胀了5倍,为什么不和平台这边沟通,一分片30多g肯定慢。然后业务一脸懵逼的查了一通,告诉我业务大小没变化。业务方说数据大小没变,我这边logtash也没动过,难道是推送kafka的时候,多推送了几次?(我自己没做改动,不可能有问题的好吗?肯定是别人有问题。。。。。)我让负责kakfa的同学帮忙查了一下,他告诉我kafka接收到的数据和往常一样,没变化。业务数据量没变,kafka接收到的数据量也没变,那只能是logtash的问题。但logstash我也没改,为什么今天就突然变大了呢?
然后我试着查看其他业务当天的索引,发现也特别慢。查看segments发现,一个一分片0副本的索引segments竟然有1400多。这肯定慢,从一个文件中查询与从1400个文件这个性能差的不是一点半点。

直觉告诉我,segments没合并和logtash重复消费两者肯定有关系。

logtash重复消费

关于logstash重复消费问题,这篇文章https://www.jianshu.com/p/6492b762c171介绍了原因。kafka的consumer消费数据时首先会从broker里读取一批消息数据进行处理,处理完成后再提交offset。如果这一批消息处理时间过长,在session.timeout.ms时间内没有处理完成,自动提交offset失败,然后kafka会重新分配partition给消费者,消费者又重新消费之前的一批数据,又出现了消费超时,所以会造成死循环,一直消费相同的数据。同时也给出了解决方案:将max_poll_records 调小,将session_timeout_ms调大即可。
问题解决流程:
1)首次尝试,将session_timeout_ms调整为和auto_commit_interval_ms默认值5s一样。观察了一段时间发现没什么效果。当时在想为啥一批数据需要处理这么久?
2)调整session_timeout_ms没什么效果,那就只能减少每一批数据的大小了。logstash的配置中,我没有配置每批的数据条数max_poll_records ,而配置了每批数据大小max_partition_fetch_bytes。将其减少为原来的一半,logstash不在重复消费kafka,终于恢复正常了。

当天索引的segments没合并

查了一圈资料也没找到segmetns没合并的原因。ES可以通过_forcemerge暴力合并,但对正在读写的索引却不建议这么做。因为merge进程会阻塞所有的写请求,知道merge结束。官网的原文是这么说的:

The force merge API allows to force merging of one or more indices through an API. The merge relates to the number of segments a Lucene index holds within each shard. The force merge operation allows to reduce the number of segments by merging them.
This call will block until the merge is complete. If the http connection is lost, the request will continue in the background, and any new requests will block until the previous force merge is complete.

forcemerge肯定是不行的,我们知道ES后台有segments合并的线程,难道是系统的合并线程挂了,没重启起来?
问题解决流程:
1)重启大发好,先重启一把试试。重启恢复正常,过了半个小时后我查看业务当天的segments发现果然少了,由原来的1400多变为了100多个。果然还是重启大发好。我告诉业务已经弄好了,让他多观察下。
2)第二天业务方又找到我说很卡,我查看segments发现,segmetns又有400多了(上午),按照这个进度推下去,晚上又会达到1000多。WTF?
又翻了一遍官网关于merge的信息,突然发现这句话:

A shard in elasticsearch is a Lucene index, and a Lucene index is broken down into segments. Segments are internal storage elements in the index where the index data is stored, and are immutable. Smaller segments are periodically merged into larger segments to keep the index size at bay and to expunge deletes.
The merge process uses auto-throttling to balance the use of hardware resources between merging and other activities like search.

意思就是说合并进程会根据系统的负载自动在读写和merge之间做均衡,如果读写压力非常大,那么merge就有可能会很慢。那没毛病了,肯定是读写压力大,引起系统降低了merge的频率。我突然想起来集群中有个业务变了,由原来的每天200G,变为每天2T。
没错就是因为其他业务突然增长了10倍,使集群写压力增大,然后logstash向ES写数据的时候耗费的时间更长,session才会timeout,才会一直重复消费,引起当天索引变大。还是因为集群写压力大,然后系统merge操作减少频率,导致当天索引segments没合并,查询特别慢。
加机器就好了。

相关文章

  • logstash 重复消费kafka问题

    前两天业务方突然找到我说当天索引ES查询很慢,原来毫秒级的查询现在竟然要20s,让我处理下。我看了下索引大小,原来...

  • Logstash重复消费Kafka的数据

    Logstash消费Kafka的数据,然后输出到Elasticsearch,某一天发现使用Kibana查询不到最近...

  • 2、Kafka重复消费问题

    Kafka实际上有个offset的概念,就是每个消息写进去,都有一个offset,代表他的序号,然后consu...

  • kafka->logstash

    一、安装kafka 请参考:kafka安装 二、安装logstash 请参考:logstash安装 三、kafka...

  • kafka重复消费

    问题背景 笔者基于java做了一个动态添加topic,并落地数据到Hbase的功能,其他同事在复用消费topic代...

  • Logstash错误排查之-`Don't know ho

    现象 今天写好logstash后(kafka-》 logstash -》kafka ),本地测试OK后,上线后,疯...

  • 如何保证消息不被重复消费

    首先, RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是...

  • logstash安装并使用

    logstash有什么用 logstash这个工具在我们这里的使用方式是从kafka消费信息并且将信息整理发送给e...

  • MQ随记(2)

    如何保证消息不会被重复消费(保证消息消费时的幂等性) kafka 按照数据进入kafka的顺序,kafka会给每条...

  • 3. MQ消息-重复消费&消费的幂等性

    一 背景 首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为...

网友评论

    本文标题:logstash 重复消费kafka问题

    本文链接:https://www.haomeiwen.com/subject/ilttvftx.html