Kafka-10版本相对于8的要更稳定一些
一.测试中遇到的问题
问题1.在测试kafkaOffset消费时,发现如果停止streaming再启动的时候,就会消费重复
官方给的文档测试就是会重复
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topicSet, kafkaParams)
)
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
10版本的kafka源码的消费逻辑 https://www.cnblogs.com/SpeakSoftlyLove/p/7726220.html
貌似就是每次消费完提交上次的offset 所以中间断了以后最后一个批次消费的offset是没有保存的 下次消费的时候就会有重复的
解决办法:
启动两个消费组,一个消费组用了当前消费,另一个消费组提交commit,下次重启时,采用另一消费组。
二.一些经验
1."enable.auto.commit" 为true时,需要再设置时间就会每隔一段时间自动保存一次。不是很建议使用。
2.kafka-client-2.11废弃了zkUtils,用AdminClient替代了她。
3.在新老版本交替的过程中,发现每次消费的数据量都是一致的,所以新版本只要多消费两个批次就可以和老版本一致,直接进行替换。
网友评论