顺序消费

作者: 念䋛 | 来源:发表于2021-06-17 19:36 被阅读0次

    生产者
    根据自定义的规则,将某一类的消息发送到同一个queue中,可以hash与队列的个数取余

    public static void main(String[] args) throws UnsupportedEncodingException {
        try {
            //默认情况下,broker为每一个topic创建4个queue,生产者把要顺序生产的消息一次的发送到同一个队列
            DefaultMQProducer producer = new DefaultMQProducer ("please_rename_unique_group_name");
            producer.setNamesrvAddr ("192.168.44.145:9876");
            producer.start ();
            for (int i = 0; i < 10; i++) {
                int orderId = i;
                for (int j = 0; j <= 5; j++) {
                    Message msg =
                            new Message ("OrderTopicTest", "order_" + orderId, "KEY" + orderId,
                                    ("order_" + orderId + " step " + j).getBytes (RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send (msg, new MessageQueueSelector () {
                        @Override
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                            Integer id = ( Integer ) arg;
                            int index = id % mqs.size ();
                            //返回就是消息发往到queue的id
                            return mqs.get (index);
                        }
                      //orderId就是select的arg变量
                    }, orderId);
                    System.out.printf ("%s%n", sendResult);
                }
            }
    
            producer.shutdown ();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace ();
        }
    }
    

    消费者
    消费者要注意使用的监听器MessageListenerOrderly,处理同一个queue的消息使用的是一个线程,单线程获取一个queue的消息保证了消息的顺序消费

    public static void main(String[] args) throws MQClientException {
        //顺序消费不是全局顺序,只是分区顺序。要全局顺序只能分配一个queue。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer ("please_rename_unique_group_name_3");
        consumer.setNamesrvAddr ("192.168.44.145:9876");
        consumer.setConsumeFromWhere (ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        consumer.subscribe ("OrderTopicTest", "*");
        //顺序消费,就需要保证消费端用同一个线程处理一个queue的消息
        consumer.registerMessageListener (new MessageListenerOrderly () {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit (true);
                for (MessageExt msg : msgs) {
                    System.out.println ("收到消息内容 " + new String (msg.getBody ())+"消息队列id-"+msg.getQueueId ());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start ();
        System.out.printf ("Consumer Started.%n");
    }
    

    消费者收到的消息, order_X代表同一类的消息,step X 是该类消息的步骤,
    可以看出来,同一类的消息的步骤是按照顺序的,但是不同类之间可能会相互的穿插,证明了顺序消费是局部消费,不是全局消费.如果要实现全局消费的话,topic创建的时候只创建一个queue(默认为4个)
    收到消息内容 order_3 step 0消息队列id-3
    收到消息内容 order_2 step 0消息队列id-2
    收到消息内容 order_0 step 0消息队列id-0
    收到消息内容 order_1 step 0消息队列id-1
    收到消息内容 order_1 step 1消息队列id-1
    收到消息内容 order_3 step 1消息队列id-3
    收到消息内容 order_2 step 1消息队列id-2
    收到消息内容 order_0 step 1消息队列id-0
    收到消息内容 order_3 step 2消息队列id-3
    收到消息内容 order_1 step 2消息队列id-1
    收到消息内容 order_0 step 2消息队列id-0
    收到消息内容 order_2 step 2消息队列id-2
    收到消息内容 order_1 step 3消息队列id-1
    收到消息内容 order_3 step 3消息队列id-3
    收到消息内容 order_2 step 3消息队列id-2
    收到消息内容 order_0 step 3消息队列id-0
    收到消息内容 order_3 step 4消息队列id-3
    收到消息内容 order_1 step 4消息队列id-1

    相关文章

      网友评论

        本文标题:顺序消费

        本文链接:https://www.haomeiwen.com/subject/qtthyltx.html