Kafkaspout的源码解读
版本:1.2.2
链接:https://juejin.im/post/5be83aa6e51d454e5b5f1187
摘录:
storm-kafka-client主要针对kafka0.10及以上版本,它引入了ProcessingGuarantee枚举,该枚举有三个值,分别是
ProcessingGuarantee.AT_LEAST_ONCE就是开启ack的版本,它类似kafka client的auto commit,在指定interval定期commit;它会维护已经emitted(已经emitted但尚未ack),offsetManagers(已经ack但尚未commit)以及fail需要重试的retrySchedules
ProcessingGuarantee.AT_MOST_ONCE,它就不管ack了,在polled out消息的时候同步commit(忽略interval配置),因而该消息最多被处理一次
ProcessingGuarantee.NO_GUARANTEE,这个也是不管ack的,不过它跟
ProcessingGuarantee.AT_LEAST_ONCE类似,是在指定interval定期commit(都依赖commitTimer),不同的是它是异步ProcessingGuarantee.AT_LEAST_ONCE它结合了storm的ack机制,在spout的ack方法维护emitted(已经emitted但尚未ack);在fail方法将msgId放入到retryService进行重试(这个是ProcessingGuarantee.NO_GUARANTEE所没有的);它跟ProcessingGuarantee.NO_GUARANTEE一样是依赖commitTimer,在initerval期间提交offset信息,不同的是它是commitSync,即同步提交,而且提交的是已经acked的消息;而ProcessingGuarantee.NO_GUARANTEE是异步提交,而且提交的是offset是不管是否在storm spout已经ack,而是以consumer的poll为准的
ProcessingGuarantee.AT_MOST_ONCE是在pollKafkaBroker方法里头,在调用完kafkaConsumer.poll之后,调用kafkaConsumer.commitSync进行同步提交commit;它是同步提交,而且不依赖commitTimer,即不是interval提交offset
ProcessingGuarantee.NO_GUARANTEE在nextTuple中判断需要commit的时候,调用kafkaConsumer.commitAsync进行异步提交,它跟ProcessingGuarantee.AT_LEAST_ONCE一样,都依赖commitTimer,在initerval期间提交offset,但是它是异步提交,而ProcessingGuarantee.AT_LEAST_ONCE是同步提交
nextTuple()方法会pollKafkaBroker会调用kafkaConsumer.poll方法拉取消息,然后将拉取到的消息放入waitingToEmit,之后调用emitIfWaitingNotEmitted方法进行emit或者waiting,如果emit则是调用emitOrRetryTuple方法;由于pollKafkaBroker会执行seek操作将offset移动到每个parition中失败的offset中最小的位置,从那个位置开始重新拉取消息,拉取消息调用了kafkaConsumer.poll方法,KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE是在这里进行kafkaConsumer.commitSync同步提交offset的;由于包含了要重试的消息,emitOrRetryTuple这里要根据offsetManagers(已经ack等待commit)以及emitted(已经emit等待ack)进行去重判断是否需要调用collector.emit;对于ProcessingGuarantee.AT_LEAST_ONCE类型,这里不仅调用emit方法,还需要维护offsetManagers、emitted及重试信息相关状态,然后回调tupleListener.onEmit方法;对于非ProcessingGuarantee.AT_LEAST_ONCE类型这里仅仅是emit。
网友评论