美文网首页Java架构技术栈
技术转载——Kafka导致重复消费的原因和解决方案

技术转载——Kafka导致重复消费的原因和解决方案

作者: 若丨寒 | 来源:发表于2020-07-21 17:22 被阅读0次

写在前面:2020年面试必备的Java后端进阶面试题总结了一份复习指南在Github上,内容详细,图文并茂,有需要学习的朋友可以Star一下!
GitHub地址:https://github.com/abel-max/Java-Study-Note/tree/master

image

问题分析

导致kafka的重复消费问题原因在于,已经消费了数据,但是offset没来得及提交(比如Kafka没有或者不知道该数据已经被消费)。

总结以下场景导致Kakfa重复消费:

原因1:强行kill线程,导致消费后的数据,offset没有提交(消费系统宕机、重启等)。

原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。

例如:

try {
    consumer.unsubscribe();
} catch (Exception e) {
}

try {
    consumer.close();
} catch (Exception e) {
}

上面代码会导致部分offset没提交,下次启动时会重复消费。

解决方法:设置offset自动提交为false

整合了Spring配置的修改如下配置

spring配置:

spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=latest

整合了API方式的修改enable.auto.commit为false

API配置:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");

一旦设置了 enable.auto.commit 为 true,Kafka 会保证在开始调用 poll 方法时,提交上次 poll 返回的所有消息。从顺序上来说,poll 方法的逻辑是先提交上一批消息的位移,再处理下一批消息,因此它能保证不出现消费丢失的情况。

原因3:(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。

原因4:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。

原因5:当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。

原因6:并发很大,可能在规定的时间(session.time.out默认30s)内没有消费完,就会可能导致reblance重平衡,导致一部分offset自动提交失败,然后重平衡后重复消费

问题描述:

我们系统压测过程中出现下面问题:异常rebalance,而且平均间隔3到5分钟就会触发rebalance,分析日志发现比较严重。错误日志如下:

08-09 11:01:11 131 pool-7-thread-3 ERROR [] - 
commit failed 
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:713) ~[MsgAgent-jar-with-dependencies.jar:na]
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:596) ~[MsgAgent-jar-with-dependencies.jar:na]
        at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1218) ~[MsgAgent-jar-with-dependencies.jar:na]
        at com.today.eventbus.common.MsgConsumer.run(MsgConsumer.java:121) ~[MsgAgent-jar-with-dependencies.jar:na]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_161]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_161]
        at java.lang.Thread.run(Thread.java:748) [na:1.8.0_161]

这个错误的意思是,消费者在处理完一批poll的消息后,在同步提交偏移量给broker时报的错。初步分析日志是由于当前消费者线程消费的分区已经被broker给回收了,因为kafka认为这个消费者死了,那么为什么呢?

问题分析:

这里就涉及到问题是消费者在创建时会有一个属性max.poll.interval.ms(默认间隔时间为300s),

该属性意思为kafka消费者在每一轮poll()调用之间的最大延迟,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将重新平衡,以便将分区重新分配给别的成员。

处理重复数据

因为offset此时已经不准确,生产环境不能直接去修改offset偏移量。

所以重新指定了一个消费组(group.id=order_consumer_group),然后指定auto-offset-reset=latest这样我就只需要重启我的服务了,而不需要动kafka和zookeeper了!

#consumer
spring.kafka.consumer.group-id=order_consumer_group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-offset-reset=latest

注:如果你想要消费者从头开始消费某个topic的全量数据,可以重新指定一个全新的group.id=new_group,然后指定auto-offset-reset=earliest即可

来源:https://www.tuicool.com/articles/6VRZB3b
欢迎关注微信公众号【慕容千语】

相关文章

  • 技术转载——Kafka导致重复消费的原因和解决方案

    写在前面:2020年面试必备的Java后端进阶面试题总结了一份复习指南在Github上,内容详细,图文并茂,有需要...

  • kafka数据丢失与重复

    1、Kafka重复消费原因 底层根本原因:已经消费了数据,但是offset没提交。 原因1:强行kill线程,导致...

  • kafka重复消费的原因

    使用了自动提交 如果自动提交的偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处...

  • kafka防止消息重复消费

    kafka重复消费的根本原因就是“数据消费了,但是offset没更新”!而我们要探究一般什么情况下会导致offse...

  • kafka消费延迟或者重复消费原因

    简介 由于项目中需要使用kafka作为消息队列,并且项目是基于spring-boot来进行构建的,所以项目采用了s...

  • kafka重复消费

    问题背景 笔者基于java做了一个动态添加topic,并落地数据到Hbase的功能,其他同事在复用消费topic代...

  • MQ随记(2)

    如何保证消息不会被重复消费(保证消息消费时的幂等性) kafka 按照数据进入kafka的顺序,kafka会给每条...

  • Kafka中消息丢失和重复消费,以及Leader选举机制

    一、Kafka中的消息是否会丢失和重复消费 要确定Kafka的消息是否丢失或重复,从两个方面分析入手:消息发送和消...

  • Kafka实际案例问题

    kafka consumer防止数据丢失 Kafka学习之怎么保证不丢,不重复消费数据 1 消费者pull数据时,...

  • Flink Kafka重复消费

    项目中使用了Flink平台实现了规则引擎功能,主要包括:数据转发和场景联动。其间多job问题一直困扰着我们,目前也...

网友评论

    本文标题:技术转载——Kafka导致重复消费的原因和解决方案

    本文链接:https://www.haomeiwen.com/subject/byhwkktx.html