美文网首页
RocketMQ 顺序消费

RocketMQ 顺序消费

作者: 放开那个BUG | 来源:发表于2021-08-01 17:39 被阅读0次

1、前言

对于所有的 MQ 来说,必问的一道面试题就是 RocketMQ 顺序消息怎样做?原理是什么?

首先我们要明确什么顺序消费,顺序消费的定义是什么?我所理解的顺序消费,指的针对某一类消息,比如都是订单A 的消息来说,它的消费有先后顺序,类似于 FIFO。假设订单 A 有创建、付款、完成这几类消息,我们对于订单 A 的消息,必须要满足先消费创建,其次是付款,最后是完成。

所以针对整个链路来说,我们不仅需要塞的时候是有序的,消费的时候也应该做到有序。就算是以 FIFO 顺序塞进去,消费如果使用多线程同时消费同一个 ConsumerQueue 且同时能消费多个消息,那必然做不到有序。接下来,会从 provider、consumer 两个方面说明如何做到有序。

2、实现

针对 provider 来说,RocketMQ 提供了发送顺序消息的方式,即 MessageQueueSelector:

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

provider 在发送的时候,只要选择消息发送到那个 ConsumerQueue 即可。比如订单来说,使用订单 id 作为 key 选择队列,那么同一个订单的消息必定能发送到同一个队列。

SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
               @Override
               public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                   Long id = (Long) arg;  //根据订单id选择发送queue
                   long index = id % mqs.size();
                   return mqs.get((int) index);
               }
           }, orderList.get(i).getOrderId());//订单id

所以 provider 的顺序发送异常简单。


针对 consumer 来说,需要使用 MessageListenerOrderly 来消费消息:

consumer.registerMessageListener(new MessageListenerOrderly() {
           @Override
           public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
               context.setAutoCommit(true);
               for (MessageExt msg : msgs) {
                   // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                   System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
               }
               return ConsumeOrderlyStatus.SUCCESS;
           }
       });

consumer 顺序消费的原理也很简单。消费者消费消息的时候,会有一个 PullMessageService 拉取线程(单线程)拉取消息,然后放入到 processQueue(每个消费队列对应一个 processQueue) 中,因为是单线程拉取的,对于同一个队列的消息(虽然消费者可以订阅多个队列,但是对于同一个队列是有序的)是有序的。在放入 processQueue 之后,会调用 ConsumeMessageConcurrentlyService 或 ConsumeMessageOrderlyService 来进行消费,这里是调用 ConsumeMessageOrderlyService 进行消费。ConsumeMessageOrderlyService 在消费的时候,会先获取每一个 ConsumerQueue 的锁,然后从 processQueue 获取消息消费,这也就意味着,对于每一个 ConsumerQueue 的消息来说,消费的逻辑也是顺序的。

3、缺点

  • 发送顺序消息无法利用集群 FailOver 特性(发送时已经选定发到哪个队列)
  • 消费顺序消息的并行度依赖于队列数量
  • 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题
  • 遇到消息失败的消息,无法跳过,当前队列消费暂停

不能更换MessageQueue重试就需要MessageQueue有自己的副本,通过Raft、Paxos之类的算法保证有可用的副本,或者通过其他高可用的存储设备来存储MessageQueue。

热点问题好像没有什么好的解决办法,只能通过拆分MessageQueue和优化路由方法来尽量均衡的将消息分配到不同的MessageQueue。

消费并行度理论上不会有太大问题,因为MessageQueue的数量可以调整。

消费失败的无法跳过是不可避免的,因为跳过可能导致后续的数据处理都是错误的。不过可以提供一些策略,由用户根据错误类型来决定是否跳过,并且提供重试队列之类的功能,在跳过之后用户可以在“其他”地方重新消费到这条消息。

4、感悟

其实对于所谓的顺序消费来说,本质上是类似于一个状态机的行为,比如一个订单先创建,后付款、最后结束的行为,完全可以定义一个状态,而且发生的顺序是有先后的。所以完全不必要使用什么顺序消费,可以先创建,把创建消息塞到 mq,从 mq 获取到创建消息消费,然后创建一个付款消息,再塞到 mq。然后从 mq 消费付款消息,然后标识订单结束。完全可以用一个状态机 + mq + db 来做,更加稳定通用。

相关文章

  • RocketMQ 顺序消费

    1、前言 对于所有的 MQ 来说,必问的一道面试题就是 RocketMQ 顺序消息怎样做?原理是什么? 首先我们要...

  • RocketMQ(2) 顺序消息、事务消息

    RocketMQ 顺序消息:消息有序是指可以按照消息发送顺序来消费。RocketMQ 可以严格的保证消息有序,但是...

  • Windows 安装 RocketMQ

    一、RocketMQ 介绍 1、消息顺序2、消息重复消费3、事务消息 二、RocketMQ 安装 Windows:...

  • RocketMQ实战(三):分布式事务

    接 《RocketMQ实战(一)》,《RocketMQ实战(二)》,本篇博客主要讨论的话题是:顺序消费、RMQ在分...

  • RocketMQ顺序消息消费

    RocketMQ顺序消息消费 1. 应用场景 消息队列中消息之间有先后的依赖关系,后一条消息的处理依赖于前一条消息...

  • RocketMQ-顺序消费

    顺序消费 消息有序是指一类消息消费时,能按照发生的顺序来消费。例如:一个订单产生三个消息:订单创建,订单付款,订单...

  • RocketMQ 顺序消费消息

    一个Topic下对应着多个队列。 以官方文档的例子,一个订单的顺序流程是:创建、付款、推送、完成。 这里有三个订单...

  • 详细讲解RocketMQ!包含多个知识点!通俗易懂!(顺序、延时

    一、顺序消息 顺序消息(FIFO 消息)是消息队列 RocketMQ 提供的一种严格按照顺序来发布和消费的消息。顺...

  • RocketMQ系列(四)顺序消费

    折腾了好长时间才写这篇文章,顺序消费,看上去挺好理解的,就是消费的时候按照队列中的顺序一个一个消费;而并发消费,则...

  • RocketMq如何保证消费顺序

    两个锁: 集群模式下锁队列保证消息被同一个consumer消费,往broker定时发送锁命令本地消费时不论集群模式...

网友评论

      本文标题:RocketMQ 顺序消费

      本文链接:https://www.haomeiwen.com/subject/atdhvltx.html