问题描述
从kafka的架构中,严格的一次消费据我所知应该是比较困难的,特别是在各种异常情况下。我们在工作中,由于性能原因,要求消费者是多进程+多线程的方式。我们发现在单个进程的情况下,程序一般不会重复消费,但是在多进程,哪怕是2个进程,然后每个进程有几个线程情况下,几乎每次都有重复消费的问题。
这个问题我们查了比较久,问题主要出现在消费者平衡的时候,我们设置的起始offset为开始位置,如果不设置初始位置显然有问题,这个没办法改。
我们采用的是高级API,采用的版本是0.8.1版本,我们在消费kafka的数据的时候,有个批量值,如果读不到这么多,就等待一定时间自动超时。在这个超时的时间内,如果另外的线程再读这个topic(当然本文说的多线程+多进程消费的是同一个topic且是同一个组)。那么显然原来线程假设为A线程,读到数据由于还在等待超时中,那么显然还未处理,未处理数据我们是不提交是必须处理完成再提交,以保障至少处理一次,所以新来读这个topic的线程从这个topic的此partition中读取数据,会从开始位置读取。
解决办法
问题根源在于Kafka的平衡机制,Kafka什么时候平衡我们无从知晓,而消费又是没平衡好就开始消费了,所以解决也从这个角度来解决。
和网友交流了下,了解到,新版本的API在平衡的时候可以注册一个对象,在平衡前和后可以调用这个对象的方法,我们在这个方法里面将此topic的stream提交(这可能会造成数据丢失,因为这些数据很可能还没处理),这个新API测试了下,基本没什么问题。
高级API如何解决?用类分布式锁最终解决了这个问题,实现思路比较简单,就是通过ZK来实现,程序启动前先定义好需要启动的消费者数量,如果还没达到这个量,线程都不能启动,达到这个线程数后,休眠几秒后启动,在启动的时候,消费者线程已经得到了平衡,除非线程死掉否则不会发生平衡了,所以暂时解决了这个问题。
思路共享出来,希望对大家有所帮助。
网友评论