1、介绍
DelayedOperationPurgatory是kafka中一个非常重要的组件之一。它解决的问题是一个操作要么在满足一定业务条件下执行,要么在超时条件下执行,它封装了这两种情况。
例如:
生产者发送消息到某topic的一个分区主节点,由于生产者请求的ack为-1,表示待更新分区的所有副本(假设有3个副本,并分散在3个broker上,broker数量也为3)都同步消息后,分区主节点才返回响应给生产者。另外一点需要考虑的是所有分区副本同步消息的超时阀值,当超时后,若仍无或只有部分分区副本同步了消息,这时候分区主节点也需向生产者返回响应。DelayedOperationPurgatory很好第解决了这两个问题。
2、组成
DelayedOperationPurgatory的组成如下图:
![](https://img.haomeiwen.com/i8723449/61708597101dedab.png)
各个组件的作用如下:
DelayedOperation:延迟操作。此延迟操作具有检查条件满足的方法(tryComplete,对应介绍中讲的一定业务条件),也具有在满足条件下的完成方法(onComplete),同时它也扩展了TimeTask,可作为SystemTimer的超时任务执行(对应介绍中的超时条件)。
Watchers:为一个延迟操作(DelayedOperation)集合,对应DelayedOperationPurgatory字段watchersForKey的键值。对于上面的生产者发送消息这一例来说, DelayedOperation为DelayedProduce,DelayedProduce所对应的key为某topic中的某个分区。
SystemTimer:DelayedOperation同样需要给加入SystemTimer中,若DelayedOperation的时间到了,则由SystemTimer把DelayedOperation提交到SystemTimer中线程池字段taskExecutor执行。SystemTimer中有一个非常重要的时间轮字段为timingWheel。关于timingWheel将在下一步讲解。
ExpiredOperationReaper:此组件具有两个功能,一个是推进SystemTimer的时间,二则清除Watchers中已经完成的延迟操作,若所有延迟操作都已完成,则会清除watchersForKey对应项。
3、TimingWheel(时间轮)
Kafka使用TimingWheel来使添加延迟操作任务的时间复杂度为O(1),以此来达到高并发、高吞吐量目的。
TimingWheel使用了层级时间轮, 比如第一层的时间格子为1ms,共有20个时间格子,那么跨度为20ms;这时候第二层时间格子为20ms,也是共有20个时间格子,那么跨度为400ms;以此类推,第三层,四层...的跨度为8000ms,160000ms...每个时间格子对应一个延迟任务列表。
若这时有一个任务是445ms后执行,由于第一层的整个时间跨度为20ms,第二层的整个时间跨度为400ms,故445ms只能放在第三层某个时间格子起始位置,若第三层的时间指针指向0格的末端,那么445ms后执行的任务将被放在第一个时间格子。随着时间的流逝,当第一个时间格子的任务列表到期后,发现距445ms任务的执行还剩45ms,故45ms后执行的任务将被降级放到第二层的对应时间格子中。
更详细的TimingWheel文章,可以参考斯大的好文:https://mp.weixin.qq.com/s/7S_TC9uTE-P1uYV5p0HzCA
网友评论