美文网首页
RocketMQ 顺序消费

RocketMQ 顺序消费

作者: 放开那个BUG | 来源:发表于2021-08-01 17:39 被阅读0次

    1、前言

    对于所有的 MQ 来说,必问的一道面试题就是 RocketMQ 顺序消息怎样做?原理是什么?

    首先我们要明确什么顺序消费,顺序消费的定义是什么?我所理解的顺序消费,指的针对某一类消息,比如都是订单A 的消息来说,它的消费有先后顺序,类似于 FIFO。假设订单 A 有创建、付款、完成这几类消息,我们对于订单 A 的消息,必须要满足先消费创建,其次是付款,最后是完成。

    所以针对整个链路来说,我们不仅需要塞的时候是有序的,消费的时候也应该做到有序。就算是以 FIFO 顺序塞进去,消费如果使用多线程同时消费同一个 ConsumerQueue 且同时能消费多个消息,那必然做不到有序。接下来,会从 provider、consumer 两个方面说明如何做到有序。

    2、实现

    针对 provider 来说,RocketMQ 提供了发送顺序消息的方式,即 MessageQueueSelector:

    public interface MessageQueueSelector {
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
    }
    

    provider 在发送的时候,只要选择消息发送到那个 ConsumerQueue 即可。比如订单来说,使用订单 id 作为 key 选择队列,那么同一个订单的消息必定能发送到同一个队列。

    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                   @Override
                   public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                       Long id = (Long) arg;  //根据订单id选择发送queue
                       long index = id % mqs.size();
                       return mqs.get((int) index);
                   }
               }, orderList.get(i).getOrderId());//订单id
    

    所以 provider 的顺序发送异常简单。


    针对 consumer 来说,需要使用 MessageListenerOrderly 来消费消息:

    consumer.registerMessageListener(new MessageListenerOrderly() {
               @Override
               public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                   context.setAutoCommit(true);
                   for (MessageExt msg : msgs) {
                       // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                       System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                   }
                   return ConsumeOrderlyStatus.SUCCESS;
               }
           });
    

    consumer 顺序消费的原理也很简单。消费者消费消息的时候,会有一个 PullMessageService 拉取线程(单线程)拉取消息,然后放入到 processQueue(每个消费队列对应一个 processQueue) 中,因为是单线程拉取的,对于同一个队列的消息(虽然消费者可以订阅多个队列,但是对于同一个队列是有序的)是有序的。在放入 processQueue 之后,会调用 ConsumeMessageConcurrentlyService 或 ConsumeMessageOrderlyService 来进行消费,这里是调用 ConsumeMessageOrderlyService 进行消费。ConsumeMessageOrderlyService 在消费的时候,会先获取每一个 ConsumerQueue 的锁,然后从 processQueue 获取消息消费,这也就意味着,对于每一个 ConsumerQueue 的消息来说,消费的逻辑也是顺序的。

    3、缺点

    • 发送顺序消息无法利用集群 FailOver 特性(发送时已经选定发到哪个队列)
    • 消费顺序消息的并行度依赖于队列数量
    • 队列热点问题,个别队列由于哈希不均导致消息过多,消费速度跟不上,产生消息堆积问题
    • 遇到消息失败的消息,无法跳过,当前队列消费暂停

    不能更换MessageQueue重试就需要MessageQueue有自己的副本,通过Raft、Paxos之类的算法保证有可用的副本,或者通过其他高可用的存储设备来存储MessageQueue。

    热点问题好像没有什么好的解决办法,只能通过拆分MessageQueue和优化路由方法来尽量均衡的将消息分配到不同的MessageQueue。

    消费并行度理论上不会有太大问题,因为MessageQueue的数量可以调整。

    消费失败的无法跳过是不可避免的,因为跳过可能导致后续的数据处理都是错误的。不过可以提供一些策略,由用户根据错误类型来决定是否跳过,并且提供重试队列之类的功能,在跳过之后用户可以在“其他”地方重新消费到这条消息。

    4、感悟

    其实对于所谓的顺序消费来说,本质上是类似于一个状态机的行为,比如一个订单先创建,后付款、最后结束的行为,完全可以定义一个状态,而且发生的顺序是有先后的。所以完全不必要使用什么顺序消费,可以先创建,把创建消息塞到 mq,从 mq 获取到创建消息消费,然后创建一个付款消息,再塞到 mq。然后从 mq 消费付款消息,然后标识订单结束。完全可以用一个状态机 + mq + db 来做,更加稳定通用。

    相关文章

      网友评论

          本文标题:RocketMQ 顺序消费

          本文链接:https://www.haomeiwen.com/subject/atdhvltx.html