美文网首页
kafka一直重复消费消息

kafka一直重复消费消息

作者: 牛红磊 | 来源:发表于2017-07-06 15:58 被阅读0次

我查了日志:

大概意思是

1、kafka conusmer会拉取一批消息,然后进行处理,但在代码中sleep了5000MS,consumer在session.timeout.ms(15000MS)时间之内没有消费完成consumer coordinator会由于没有接受到心跳而挂掉

2coordinator挂掉了,然后自动提交offset失败,然后重新分配partition给客户端

3、由于自动提交offset失败,导致重新分配了partition的客户端又重新消费之前的一批数据

4、接着consumer重新消费,又出现了消费超时,无限循环下去

解决办法:

方法1、增加了一个固定长度的阻塞队列和工作线程池来提高并行消费的能力(已上线)

方法2、由于使用了spring-kafka,则把kafka-client的enable.auto.commit设置成了false,表示禁止kafka-client自动提交offset,因为就是之前的自动提交失败,导致offset永远没更新,从而转向使用spring-kafka的offset提交机制。

1)、如果auto.commit关掉的话,spring-kafka会启动一个invoker,这个invoker的目的就是启动一个线程去消费数据,他消费的数据不是直接从kafka里面直接取的,那么他消费的数据从哪里来呢?他是从一个spring-kafka自己创建的阻塞队列里面取的。

2)、然后会进入一个循环,从源代码中可以看到如果auto.commit被关掉的话, 他会先把之前处理过的数据先进行提交offset,然后再去从kafka里面取数据。

3)、然后把取到的数据丢给上面提到的阻塞列队,由上面创建的线程去消费,并且如果阻塞队列满了导致取到的数据塞不进去的话,spring-kafka会调用kafka的pause方法,则consumer会停止从kafka里面继续再拿数据。

4)、接着spring-kafka还会处理一些异常的情况,比如失败之后是不是需要commit offset这样的逻辑

相关文章

  • kafka一直重复消费消息

    我查了日志: 大概意思是 1、kafka conusmer会拉取一批消息,然后进行处理,但在代码中sleep了50...

  • MQ随记(2)

    如何保证消息不会被重复消费(保证消息消费时的幂等性) kafka 按照数据进入kafka的顺序,kafka会给每条...

  • kafka防止消息重复消费

    kafka重复消费的根本原因就是“数据消费了,但是offset没更新”!而我们要探究一般什么情况下会导致offse...

  • Kafka中消息丢失和重复消费,以及Leader选举机制

    一、Kafka中的消息是否会丢失和重复消费 要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消...

  • Kafka 入门代码示例

    kafka 生产者 配置类 发送端 消费者 Kafka中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调...

  • python3读写kafka

    消费kafka数据,方式一 消费kafka数据,方式二 将消息写入kafka

  • 消息队列

    为什么使用消息队列?消息队列有什么优点和缺点? 如何保证消息队列高可用?如何保证消息不被重复消费? kafka,a...

  • Kafka

    1.Kafka高可用怎么做的? 2.Kafka消息不重复怎么做的?如何保证消息消费时的幂等性? 3.Kakfa如何...

  • 3. MQ消息-重复消费&消费的幂等性

    一 背景 首先,比如 RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为...

  • 如何保证消息不被重复消费

    首先, RabbitMQ、RocketMQ、Kafka,都有可能会出现消息重复消费的问题,正常。因为这问题通常不是...

网友评论

      本文标题:kafka一直重复消费消息

      本文链接:https://www.haomeiwen.com/subject/lowhhxtx.html