美文网首页RocketMQ系列RocketMQ
RocketMQ系列(八):延迟消息

RocketMQ系列(八):延迟消息

作者: 范柏柏 | 来源:发表于2020-06-07 16:51 被阅读0次

    开源的rocketMq支持延迟消息,但不支持秒级精度。默认支持18个level的延迟消息,通过broker端的messageDelayLevel配置项确定

    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    Broker在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列,也就是说18个level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。

    延迟级别的值可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持2天的延迟,修改最后一个level的值为2d,这个时候依然是18个level;也可以增加一个2d,这个时候总共就有19个level。

    可以看到这里并不支持秒级精度,按照《rocketmq developer guide》中的说法,是为了避免在broker对消息进行排序,造成性能影响。

    Broker端存储延迟消息:

    延迟消息在RocketMQ Broker端的流转如下图所示:


    延迟消息存储.jpeg
    1. 修改消息Topic名称和队列信息
    2. 转发消息到延迟主题的CosumeQueue中
    3. 延迟服务消费SCHEDULE_TOPIC_XXXX消息
    4. 将信息重新存储到CommitLog中
    5. 将消息投递到目标Topic中
    6. 消费者消费目标topic中的数据

    第一步:修改消息Topic名称和队列信息

    RocketMQ Broker端在存储生产者写入的消息时,首先都会将其写入到CommitLog中。之后根据消息中的Topic信息和队列信息,将其转发到目标Topic的指定队列(ConsumeQueue)中。

    由于消息一旦存储到ConsumeQueue中,消费者就能消费到,而延迟消息不能被立即消费,所以这里将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并根据延迟级别确定要投递到哪个队列下。

    同时,还会将消息原来要发送到的目标Topic和队列信息存储到消息的属性中。

    第二步:转发消息到延迟主题的CosumeQueue中

    CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中,会对延迟消息进行特殊处理,主要是计算这条延迟消息需要在什么时候进行投递。

    投递时间=消息存储时间(storeTimestamp) + 延迟级别对应的时间
    

    需要注意的是,会将计算出的投递时间当做消息Tag的哈希值存储到CosumeQueue中,CosumeQueue单个存储单元组成结构如下图所示:


    7c2hsw38d0.jpeg
    • Commit Log Offset:记录在CommitLog中的位置。
    • Size:记录消息的大小
    • Message Tag HashCode:记录消息Tag的哈希值,用于消息过滤。特别的,对于延迟消息,这个字段记录的是消息的投递时间戳。这也是为什么java中hashCode方法返回一个int型,只占用4个字节,而这里Message Tag HashCode字段却设计成8个字节的原因。

    第三步:延迟服务消费SCHEDULE_TOPIC_XXXX消息

    Broker内部有一个ScheduleMessageService类,其充当延迟服务,消费SCHEDULE_TOPIC_XXXX中的消息,并投递到目标Topic中。

    ScheduleMessageService在启动时,其会创建一个定时器Timer,并根据延迟级别的个数,启动对应数量的TimerTask,每个TimerTask负责一个延迟级别的消费与投递。

    源码.jpeg

    需要注意的是,每个TimeTask在检查消息是否到期时,首先检查对应队列中尚未投递第一条消息,如果这条消息没到期,那么之后的消息都不会检查。如果到期了,则进行投递,并检查之后的消息是否到期。

    第四步:将信息重新存储到CommitLog中

    在将消息到期后,需要投递到目标Topic。由于在第一步已经记录了原来的Topic和队列信息,因此这里重新设置,再存储到CommitLog即可。此外,由于之前Message Tag HashCode字段存储的是消息的投递时间,这里需要重新计算tag的哈希值后再存储。

    源码参见:DeliverDelayedMessageTimerTask的messageTimeup方法。

    第五步:将消息投递到目标Topic中

    这一步与第二步类似,不过由于消息的Topic名称已经改为了目标Topic。因此消息会直接投递到目标Topic的ConsumeQueue中,之后消费者即消费到这条消息。

    延迟消息与消费重试的关系

    RocketMQ提供了消息重试的能力,在并发模式消费消费失败的情况下,可以返回一个枚举值RECONSUME_LATER,那么消息之后将会进行重试。如:

    consumer.registerMessageListener(new MessageListenerConcurrently() {
           @Override
           public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                           ConsumeConcurrentlyContext context) {
               //处理消息,失败,返回RECONSUME_LATER,进行重试
               return ConsumeConcurrentlyStatus.RECONSUME_LATER;
           }
       });
    

    重试默认会进行重试16次。使用过RocketMQ消息重试功能的用户,可能看到过以下这张图:


    消息重试.png

    消息重试的16个级别,实际上是把延迟消息18个级别的前两个level去掉了。事实上,RocketMQ的消息重试也是基于延迟消息来完成的。在消息消费失败的情况下,将其重新当做延迟消息投递回Broker。

    在投递回去时,会跳过前两个level,因此只重试16次。

    参考

    https://cloud.tencent.com/developer/article/1581368

    相关文章

      网友评论

        本文标题:RocketMQ系列(八):延迟消息

        本文链接:https://www.haomeiwen.com/subject/mzpctktx.html