1、Kafka重复消费原因
底层根本原因:已经消费了数据,但是offset没提交。
原因1:强行kill线程,导致消费后的数据,offset没有提交。
原因2:设置offset为自动提交,关闭kafka时,如果在close之前,调用 consumer.unsubscribe() 则有可能部分offset没提交,下次重启会重复消费。
原因3(重复消费最常见的原因):消费后的数据,当offset还没有提交时,partition就断开连接。比如,通常会遇到消费的数据,处理很耗时,导致超过了Kafka的session timeout时间(0.10.x版本默认是30秒),那么就会re-blance重平衡,此时有一定几率offset没提交,会导致重平衡后重复消费。
原因4:当消费者重新分配partition的时候,可能出现从头开始消费的情况,导致重发问题。
原因5:当消费者消费的速度很慢的时候,可能在一个session周期内还未完成,导致心跳机制检测报告出问题。
2、Kafka Consumer丢失数据原因
猜测:设置offset为自动定时提交,当offset被自动定时提交时,数据还在内存中未处理,此时刚好把线程kill掉,那么offset已经提交,但是数据未处理,导致这部分内存中的数据丢失。
消息推动服务,每天早上,手机上各终端都会给用户推送消息,这时候流量剧增,可能会出现kafka发送数据过快,导致服务器网卡爆满,或者磁盘处于繁忙状态,可能会出现丢包现象。
总结起来就是来得快,去得慢,然后offset又提交了。
解决方案:首先对kafka进行限速,其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all。
检测方法:使用重试机制,查看问题所在。
kafka配置如下:
props.put("compression.type", "gzip");
props.put("linger.ms", "50");
props.put("acks", "all");
props.put("retries ", 30);
props.put("reconnect.backoff.ms ", 20000);
props.put("retry.backoff.ms", 20000);
3、解决方案
维护offset
记录offset和恢复offset的方案。理论上记录offset,下一个group consumer可以接着记录的offset位置继续消费。
简单offset记录方案:
每次消费时更新每个topic+partition位置的offset在内存中,
Map<key, value>,key=topic+'-'+partition,value=offset
当调用关闭consumer线程时,把上面Map的offset数据记录到 文件中*(分布式集群可能要记录到redis中)。
下一次启动consumer,需要读取上一次的offset信息,方法是 以当前的topic+partition为key,从上次的Map中去寻找offset。
然后使用consumer.seek()方法指定到上次的offset位置。
说明:
1、该方案针对单台服务器比较简单,直接把offset记录到本地文件中即可,但是对于多台服务器集群,offset也要记录到同一个地方,并且需要做去重处理。
如果线上程序是由多台服务器组成的集群,是否可以用一台服务器来支撑?应该可以,只是消费慢一点,没多大影响。
2、如何保证接着offset消费的数据正确性
为了确保consumer消费的数据一定是接着上一次consumer消费的数据,consumer消费时,记录第一次取出的数据,将其offset和上次consumer最后消费的offset进行对比,如果相同则继续消费。如果不同,则停止消费,检查原因。
网友评论