我查了日志:
大概意思是
1、kafka conusmer会拉取一批消息,然后进行处理,但在代码中sleep了5000MS,consumer在session.timeout.ms(15000MS)时间之内没有消费完成consumer coordinator会由于没有接受到心跳而挂掉
2coordinator挂掉了,然后自动提交offset失败,然后重新分配partition给客户端
3、由于自动提交offset失败,导致重新分配了partition的客户端又重新消费之前的一批数据
4、接着consumer重新消费,又出现了消费超时,无限循环下去
解决办法:
方法1、增加了一个固定长度的阻塞队列和工作线程池来提高并行消费的能力(已上线)
方法2、由于使用了spring-kafka,则把kafka-client的enable.auto.commit设置成了false,表示禁止kafka-client自动提交offset,因为就是之前的自动提交失败,导致offset永远没更新,从而转向使用spring-kafka的offset提交机制。
1)、如果auto.commit关掉的话,spring-kafka会启动一个invoker,这个invoker的目的就是启动一个线程去消费数据,他消费的数据不是直接从kafka里面直接取的,那么他消费的数据从哪里来呢?他是从一个spring-kafka自己创建的阻塞队列里面取的。
2)、然后会进入一个循环,从源代码中可以看到如果auto.commit被关掉的话, 他会先把之前处理过的数据先进行提交offset,然后再去从kafka里面取数据。
3)、然后把取到的数据丢给上面提到的阻塞列队,由上面创建的线程去消费,并且如果阻塞队列满了导致取到的数据塞不进去的话,spring-kafka会调用kafka的pause方法,则consumer会停止从kafka里面继续再拿数据。
4)、接着spring-kafka还会处理一些异常的情况,比如失败之后是不是需要commit offset这样的逻辑
网友评论