概要:1、位移设置:定期自动提交、周期时间 2、消息丢失
3、手动提交:1)同步 2)异步
-----------------------------------------------------------------------------------------------------------------------
位移提交时机,造成重复消费和消息丢失
1、消息丢失(下图):poll() 拉取 [x+2, x+7]:拉到就位移提交 x+8,消费 x+5 异常,恢复后从 x+8 开始。5 至7 之未消费
2、重复消费:拉取后提交位移,消费5异常,恢复后,从 2 开始
实际更复杂:第一次位移提交 x+8,下次x+4
一、位移设置
自动定期提交(默认),enable.auto.commit 配置true。
周期时间(默认5秒)auto.commit.interval.ms 配置
在 poll() 里完成,拉取请求前,检查是否可提交位移(上一次轮询位移)
二、消息丢失
自动提交是延时提交,重复消费可理解,为什么丢失呢?消费一半异常,但位移提交了。
线程A拉取存缓存BlockingQueue 中,B从缓存中读并处理。目前y+1 次拉,m次位移提交,6 前提交了,B还消费3。如B异常恢复m处,就是6拉消息,3-6 间丢失
三、手动提交位移
很多时候不是拉到就完成,写入db、缓存等才可。
commitSync() 和 commitAsync() 同/异步方法
1、commitSync()
1)offsets 提交指定分区位移。commitSync() 无参只能提交当前批次对应position 值。如提交中间值,每消费就提交,用这种方式。
2)实际很少这样,commitSync()本身同步耗费性能,这样更拉低。更多按分区粒度划分提交位移界限,这里用第10节中提的 ConsumerRecords 类的partitions() 方法和 records(TopicPartition) 方法
2、commitAsync()
不会被阻塞,性能增强,三个不同的重载:
1、回调函数:1和3 offsets 对照commitSync() 好理解。关键2、3中callback 提供异步提交的回调方法,提交后回调 OffsetCommitCallback 的 onComplete() 。第二个演示:
2、失败:
1)重试:异步提交x失败,下次提交x+y,成功。重复消费
2)提交后加递增序号解决,重试时比较,相同才提交。避免重复消费
如异常退出,重复消费难避免
如正常退出或再均衡前同步提交做最后把关
https://mp.weixin.qq.com/s/XsPQFFLG2VBsJbawxERbWA
网友评论