Kafka中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。Kafka并没有使用JDK自带的Timer或DelayQueue来实现延时的功能,而是基于时间轮的概念自定义实现了一个用于延时功能的定时器(SystemTimer)。JDK中Timer和DelayQueue的插入和删除操作的平均时间复杂度为O(nlogn)并不能满足Kafka的高性能要求,而基于时间轮可以将插入和删除操作的时间复杂度都降为O(1)。时间轮的应用并非Kafka独有,其应用场景还有很多,在Netty、Akka、Quartz、ZooKeeper等组件中都存在时间轮的踪影。
Kafka中的时间轮(TimingWheel)是一个存储定时任务的环形队列,底层采用数组实现,数组中的每个元素可以存放一个定时任务列表(TimerTaskList)。
TimerTaskList是一个环形的双向链表,链表中的每一项表示的都是定时任务项(TimerTaskEntry),其中封装了真正的定时任务(TimerTask)。
时间轮由多个时间格组成,每个时间格代表当前时间轮的基本时间跨度(tickMs)。时间轮的时间格个数是固定的,可用wheelSize来表示,那么整个时间轮的总体时间跨度(interval)可以通过公式tickMs×wheelSize计算得出。时间轮还有一个表盘指针(currentTime),用来表示时间轮当前所处的时间,currentTime是tickMs的整数倍。currentTime可以将整个时间轮划分为到期部分和未到期部分,currentTime当前指向的时间格也属于到期部分,表示刚好到期,需要处理此时间格所对应的TimerTaskList中的所有任务。
image.png2的TimerTaskList中。随着时间的不断推移,指针currentTime不断向前推进,过了2ms之后,当到达时间格2时,就需要将时间格2对应的TimeTaskList中的任务进行相应的到期操作。此时若又有一个定时为 8ms 的任务插进来,则会存放到时间格 10 中(2 + 8),currentTime再过8ms后会指向时间格10。如果同时有一个定时为19ms的任务插进来怎么办?新来的TimerTaskEntry会复用原来的TimerTaskList,所以它会插入原本已经到期的时间格1。总之,整个时间轮的总体跨度是不变的,随着指针currentTime的不断推进,当前时间轮所能处理的时间段也在不断后移,总体时间范围在currentTime和currentTime+interval之间。
多层时间轮
设计源于生活。我们常见的钟表就是一种具有三层结构的时间轮,第一层时间轮tickMs=1ms、wheelSize=60、interval=1min,此为秒钟;第二层 tickMs=1min、wheelSize=60、interval=1hour,此为分钟;第三层tickMs=1hour、wheelSize=12、interval=12hours,此为时钟。在 Kafka 中,第一层时间轮的参数同上面的案例一样:tickMs=1ms、wheelSize=20、interval=20ms,各个层级的wheelSize也固定为20,所以各个层级的tickMs和interval也可以相应地推算出来。
如果此时有一个定时为350ms的任务该如何处理?直接扩充wheelSize的大小?Kafka中不乏几万甚至几十万毫秒的定时任务,这个wheelSize的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如100万毫秒,那么这个wheelSize为100万毫秒的时间轮不仅占用很大的内存空间,而且也会拉低效率。Kafka 为此引入了层级时间轮的概念,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中。
复用之前的案例,第一层的时间轮tickMs=1ms、wheelSize=20、interval=20ms。第二层的时间轮的tickMs为第一层时间轮的interval,即20ms。(这一层的tickMs等于上一层的interval)每一层时间轮的wheelSize是固定的,都是20,那么第二层的时间轮的总体时间跨度interval为400ms。以此类推,这个400ms也是第三层的tickMs的大小,第三层的时间轮的总体时间跨度为8000ms。
对于之前所说的350ms的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二层时间轮中,最终被插入第二层时间轮中时间格17所对应的TimerTaskList。如果此时又有一个定时为450ms的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入第三层时间轮中时间格1的TimerTaskList。注意到在到期时间为[400ms,800ms)区间内的多个任务(比如446ms、455ms和473ms的定时任务)都会被放入第三层时间轮的时间格1,时间格1对应的TimerTaskList的超时时间为400ms。
image.png时间轮的推进
在Kafka中到底是怎么推进时间的呢?类似采用JDK中的scheduleAtFixedRate来每秒推进时间轮?显然这样并不合理,TimingWheel也失去了大部分意义。
Kafka中的定时器借了JDK中的DelayQueue来协助推进时间轮。具体做法是对于每个使用到的TimerTaskList都加入DelayQueue,“每个用到的TimerTaskList”特指非哨兵节点的定时任务项TimerTaskEntry对应的TimerTaskList。DelayQueue会根据TimerTaskList对应的超时时间expiration来排序,最短expiration的TimerTaskList会被排在DelayQueue的队头。Kafka中会有一个线程来获取 DelayQueue 中到期的任务列表,有意思的是这个线程所对应的名称叫作“ExpiredOperationReaper”,可以直译为“过期操作收割机”,当“收割机”线程获取 DelayQueue 中超时的任务列表 TimerTaskList之后,既可以根据 TimerTaskList 的 expiration 来推进时间轮的时间,也可以就获取的TimerTaskList执行相应的操作,对里面的TimerTaskEntry该执行过期操作的
注意对定时任务项TimerTaskEntry的插入和删除操作双向链表TimerTaskList,TimingWheel时间复杂度为O(1),性能高出DelayQueue很多,如果直接将TimerTaskEntry插入DelayQueue,那么性能显然难以支撑。就算我们根据一定的规则将若干TimerTaskEntry划分到TimerTaskList这个组中,然后将TimerTaskList插入DelayQueue(先按组分,然后再插入延时队列,减少插入的次数)
Kafka 中的 TimingWheel 专门用来执行插入和删除 TimerTaskEntry的操作,而 DelayQueue 专门负责时间推进的任务。试想一下,DelayQueue 中的第一个超时任务列表的expiration为200ms,第二个超时任务为840ms,这里获取DelayQueue的队头只需要O(1)的时间复杂度(获取之后DelayQueue内部才会再次切换出新的队头)。如果采用每秒定时推进,那么获取第一个超时的任务列表时执行的200次推进中有199次属于“空推进”,而获取第二个超时任务时又需要执行639次“空推进”,这样会无故空耗机器的性能资源,这里采用DelayQueue来辅助以少量空间换时间,从而做到了“精准推进”。
1.消费者在拉取一批消息之后,如果这批消息中有未到达延时时间的消息,那么就将这条消息重新写入主题等待后续再次消费
问题:
当消费滞后很多(消息大量堆积)的时候,原本这条消息只要再等待5秒就能够被消费,但这里却将其再次存入主题,等到再次读取到这条消息的时候有可能已经过了半小时。由此可见,这种改进方案无法保证延时精度
2.在发送延时消息的时候并不是先投递到要发送的真实主题(real_topic)中,而是先投递到一些 Kafka 内部的主题(delay_topic)中,这些内部主题对用户不可见,然后通过一个自定义的服务拉取这些内部主题中的消息,并将满足条件的消息再投递到要发送的真实的主题中,消费者所订阅的还是真实的主题。
问题:
如果采用这种方案,那么一般是按照不同的延时等级来划分的,比如设定5s、10s、30s、1min、2min、5min、10min、20min、30min、45min、1hour、2hour这些按延时时间递增的延时等级,延时的消息按照延时时间投递到不同等级的主题中,投递到同一主题中的消息的延时时间会被强转为与此主题延时等级一致的延时时间
在DelayService 内部还会有专门的消息发送线程来获取 DelayQueue 的消息并转发到真实的主题中。从消费、暂存再到转发,线程之间都是一一对应的关系。
image.pngdelayQueue 的作用是将消息按照再次投递时间进行有序排序,这样下游的消息发送线程就能够按照先后顺序获取最先满足投递条件的消息。再次投递时间是指消息的时间戳与延时时间的数值之和,因为延时消息从创建开始起需要经过延时时间之后才能被真正投递到真实主题中。同一分区中的消息的延时级别一样,也就意味着延时时间一样,那么对同一个分区中的消息而言,也就自然而然地按照投递时间进行有序排列,那么为何还需要DelayQueue的存在呢?因为一个主题中一般不止一个分区,分区之间的消息并不会按照投递时间进行排序
3.基于文件的时间轮
这里需要的是单层时间轮。而且延时消息也不再是缓存在内存中,而是暂存至文件中。时间轮中每个时间格代表一个延时时间,并且每个时间格也对应一个文件,整体上可以看作单层文件时间轮,每个时间格代表1秒,若要支持2小时(也就是2×60×60=7200)之内的延时时间的消息,那么整个单层时间轮的时间格数就需要7200个,与此对应的也就需要7200个文件,听上去似乎需要庞大的系统开销,就单单文件句柄的使用也会耗费很多的系统资源。其实不然,我们并不需要维持所有文件的文件句柄,只需要加载距离时间轮表盘指针(currentTime)相近位置的部分文件即可,其余都可以用类似“懒加载”的机制来维持:若与时间格对应的文件不存在则可以新建,若与时间格对应的文件未加载则可以重新加载,整体上造成的时延相比于延时等级方案而言微乎其微。随着表盘指针的转动,其相邻的文件也会变得不同,整体上在内存中只需要维持少量的文件句柄就可以让系统运转起来。
这里为什么强调的是单层时间轮。试想一下,如果这里采用的是多层时间轮,那么必然会有时间轮降级的动作,那就需要将高层时间轮中时间格对应文件中的内容写入低层时间轮,高层时间格中伴随的是读取文件内容、写入低层时间轮、删除已写入的内容的操作,与此同时,高层时间格中也会有新的内容写入。如果要用多层时间轮来实现,不得不增加繁重的元数据控制信息和繁杂的锁机制。对单层时间轮中的时间格而言,其对应的要么是追加文件内容,要么是删除整个文件(到达延时时间,就可以读取整个文件中的内容做转发,并删除整个文件)。采用单层时间轮可以简化工程实践,减少出错的可能,性能上也并不会比多层时间轮差。
采用时间轮可以解决延时精度的问题,采用文件可以解决内存暴涨的问题,那么剩下的还有一个可靠性的问题,这里就借鉴了图11-6中的多副本机制,生产者同样将消息写入多个备份(单层文件时间轮),待时间轮转动而触发某些时间格过期时就可以将时间格对应的文件内容(也就是延时消息)转发到真实主题中,并且删除相应的文件。与此同时,还会有一个后台服务专门用来清理其他时间轮中相应的时间格
总体上而言,对于延时队列的封装实现,如果要求延时精度不是那么高,则建议使用延时等级的实现方案,毕竟实现起来简单明了。反之,如果要求高精度或自定义延时时间,那么可以选择单层文件时间轮的方案。
网友评论