说明:rocketmq实现的延时队列只支持特定的延时时间段,1s,5s,10s,...2h,不能支持任意时间段的延时
具体实现:rocketmq发送延时消息时先把消息按照延迟时间段发送到指定的队列中(rocketmq把每种延迟时间段的消息都存放到同一个队列中)然后通过一个定时器进行轮训这些队列,查看消息是否到期,如果到期就把这个消息发送到指定topic的队列中,这样的好处是同一队列中的消息延时时间是一致的,还有一个好处是这个队列中的消息时按照消息到期时间进行递增排序的,说的简单直白就是队列中消息越靠前的到期时间越早
流程图
源码分析:
如果想要深入了解的可以看一下ScheduleMessageService这个类
delayLevelTable定义了延迟级别和延迟时间的对应关系,offsetTable存放延延迟级别对应的队列消费的offset
使用timer定时器启动了一个定时任务,把每个扫描队列封装成一个任务,然后加入到timer中
i每个扫描任务主要是把队列中所有到期的消息都拿出来,并发送到指定的topic下,并把延迟队列中的消息删除
总结
优点:设计简单,把所有相同延迟时间的消息都先放到一个队列中,定时扫描,可以保证消息消费的有序性
缺点:定时器采用了timer,timer是单线程运行,如果延迟消息数量很大的情况下,可能单线程处理不过来,造成消息到期后也没有发送出去的情况
改进点:可以在每个延迟队列上各采用一个timer,或者使用timer进行扫描,加一个线程池对消息进行处理,这样可以提供效率
网友评论