美文网首页面试精选
RocketMQ 顺序消费消息

RocketMQ 顺序消费消息

作者: markeNick | 来源:发表于2021-06-25 16:45 被阅读0次

一个Topic下对应着多个队列。

以官方文档的例子,一个订单的顺序流程是:创建、付款、推送、完成。

这里有三个订单各自需要顺序完成:创建、付款、推送、完成。

可以将订单号相同的消息先后发送到同一个队列中,消费时,一个队列就只有一个线程去消费,达到顺序消费目的

示意图:

mq顺序消费.png

生产者

使用MessageQueueSelector来决定将订单的一组操作分发到相同的队列,

这里将订单号取模

/**
 * Producer,发送顺序消息
 */
public class Producer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        producer.setNamesrvAddr("10.111.105.41:9876");

        producer.start();

        String[] tags = new String[]{"TagA", "TagC", "TagD"};

        // 订单列表
        List<OrderStep> orderList = new Producer().buildOrders();

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String dateStr = sdf.format(date);
        for (int i = 0; i < orderList.size(); i++) {
            // 加个时间前缀
            String body = dateStr + " Hello RocketMQ " + orderList.get(i);
            String tag = tags[i % tags.length];
            Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());

            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    // 根据订单id选择发送queue
                    Long id = (Long) arg;  
                    long index = id % mqs.size();
                    return mqs.get((int) index);
                }
            }, orderList.get(i).getOrderId());// 订单id

            System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s, tag:%s",
                    sendResult.getSendStatus(),
                    sendResult.getMessageQueue().getQueueId(),
                    body,
                    tag));
        }

        producer.shutdown();
    }

    /**
     * 订单的步骤
     */
    private static class OrderStep {
        private long orderId;
        private String desc;

        public long getOrderId() {
            return orderId;
        }

        public void setOrderId(long orderId) {
            this.orderId = orderId;
        }

        public String getDesc() {
            return desc;
        }

        public void setDesc(String desc) {
            this.desc = desc;
        }

        @Override
        public String toString() {
            return "OrderStep{" +
                    "orderId=" + orderId +
                    ", desc='" + desc + '\'' +
                    '}';
        }
    }

    /**
     * 生成模拟订单数据
     */
    private List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111065L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103117235L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(15103111039L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息

/**
 * 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
 */
public class ConsumerInOrder {

    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr("10.111.105.41:9876");
        /**
         * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
         * 如果非第一次启动,那么按照上次消费的位置继续消费
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTest", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {

            Random random = new Random();

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                for (MessageExt msg : msgs) {
                    // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
                    System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
                }

                try {
                    //模拟业务逻辑处理中...
                    TimeUnit.SECONDS.sleep(random.nextInt(10));
                } catch (Exception e) {
                    e.printStackTrace();
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();

        System.out.println("Consumer Started.");
    }
}

生产者发送消息,可以看到 orderId=15103111039 的操作都被放到同一个队列里面


生产者发送消息.png

消费者消费消息,可以看到 每一个队列都是只有一个线程去消费,利用队列的FIFO即可保证顺序消费


消费者顺序消费消息.png

相关文章

  • RocketMQ(2) 顺序消息、事务消息

    RocketMQ 顺序消息:消息有序是指可以按照消息发送顺序来消费。RocketMQ 可以严格的保证消息有序,但是...

  • Windows 安装 RocketMQ

    一、RocketMQ 介绍 1、消息顺序2、消息重复消费3、事务消息 二、RocketMQ 安装 Windows:...

  • RocketMQ顺序消息消费

    RocketMQ顺序消息消费 1. 应用场景 消息队列中消息之间有先后的依赖关系,后一条消息的处理依赖于前一条消息...

  • RocketMQ 顺序消费消息

    一个Topic下对应着多个队列。 以官方文档的例子,一个订单的顺序流程是:创建、付款、推送、完成。 这里有三个订单...

  • 详细讲解RocketMQ!包含多个知识点!通俗易懂!(顺序、延时

    一、顺序消息 顺序消息(FIFO 消息)是消息队列 RocketMQ 提供的一种严格按照顺序来发布和消费的消息。顺...

  • RocketMQ顺序消息

    我们知道消息队列的特性导致其消息不是顺序进行消费的,RocketMQ没有提供所谓的顺序消息来供我们使用,但是有时候...

  • RocketMQ(六)消息类型--顺序消息

    rocketmq支持顺序消息,而在rocketmq-spring-boot-starter中,分别提供了顺序同步,...

  • RocketMQ 顺序消费

    1、前言 对于所有的 MQ 来说,必问的一道面试题就是 RocketMQ 顺序消息怎样做?原理是什么? 首先我们要...

  • RocketMQ顺序消息

    顺序消息 之前我本地使用的client版本是3.6.2的,但是公司服务器上安得是3.2.6的版本。导致我测试顺序消...

  • RocketMQ顺序消息

    阿里help:https://help.aliyun.com/document_detail/49319.html...

网友评论

    本文标题:RocketMQ 顺序消费消息

    本文链接:https://www.haomeiwen.com/subject/iheoyltx.html