美文网首页
RocketMQ 5.消费者核心应用

RocketMQ 5.消费者核心应用

作者: 香沙小熊 | 来源:发表于2021-01-07 17:40 被阅读0次

1. PushConsumer核心参数详解

  • consumeFromWhere

CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费

  • allocateMessageQueueStrategy
    平均分配的实现算法

如果消费者的个数可以除尽队列的个数,那么就完全平均分。
如果不能除尽。那么靠前的消费者多消费一个队列,靠后的消费平均数个队列。
如果消费者的个数大于队列的个数,那么靠前的消费者消费一个队列,后面的不消费。

  • subscription
    订阅关系
  • offsetStore
    消费进度相关类
  • consumeThreadMin/consumeThreadMax
    最小使用者线程数/最大使用者线程数
  • consumeConcurrentlyMaxSpan
    同时最大跨度偏移,它对顺序消耗没有影响
  • pullThresholdForQueue
    队列级别的流控制阈值,默认情况下每个消息队列最多缓存1000条消息,参考pullBatchSize,瞬时值可能会超过限制
  • pullInterval/pullBatchSize
    消息拉取间隔/消息拉取大小
  • consumeMessageBatchMaxSize
    批量消费消息大小

2. PushConsumer消费模式-集群模式

  • Clustering模式(默认)
    GroupName用于把多个Consumer组织到一起
    相同GroupName的Consumer只消费所订阅消息的一部分
    目的:达到天然的负载均衡机制
public class Consumer1 {

    public Consumer1() {
        try {
            String group_name = "test_model_consumer_name1";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
            consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
            consumer.subscribe("test_model_topic", "TagA");
            consumer.setMessageModel(MessageModel.CLUSTERING);
//            consumer.setMessageModel(MessageModel.BROADCASTING);
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Consumer1 c1 = new Consumer1();
        System.out.println("c1 start..");

    }

    class Listener implements MessageListenerConcurrently {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
                    String tags = msg.getTags();
                    //if(tags.equals("TagA")) {
                    System.out.println("收到消息:" + "  topic :" + topic + "  ,tags : " + tags + " ,msg : " + msgBody);
                    //}
                }
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }
}
public class Consumer2 {

    public Consumer2() {
        try {
            String group_name = "test_model_consumer_name2";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
            consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
            consumer.subscribe("test_model_topic", "TagB");
            consumer.setMessageModel(MessageModel.CLUSTERING);
//            consumer.setMessageModel(MessageModel.BROADCASTING);
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Consumer2 c2 = new Consumer2();
        System.out.println("c2 start..");

    }

    class Listener implements MessageListenerConcurrently {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
                    String tags = msg.getTags();
                    //if(tags.equals("TagB")) {
                    System.out.println("收到消息:" + "  topic :" + topic + "  ,tags : " + tags + " ,msg : " + msgBody);
                    //}
                }
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }
}

public class Consumer3 {

    public Consumer3() {
        try {
            String group_name = "test_model_consumer_name2";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
            consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
            consumer.subscribe("test_model_topic", "TagB");
            consumer.setMessageModel(MessageModel.CLUSTERING);
//            consumer.setMessageModel(MessageModel.BROADCASTING);
            consumer.registerMessageListener(new Listener());
            consumer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        Consumer3 c2 = new Consumer3();
        System.out.println("c3 start..");

    }

    class Listener implements MessageListenerConcurrently {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            try {
                for (MessageExt msg : msgs) {
                    String topic = msg.getTopic();
                    String msgBody = new String(msg.getBody(), StandardCharsets.UTF_8);
                    String tags = msg.getTags();
                    //if(tags.equals("TagB")) {
                    System.out.println("收到消息:" + "  topic :" + topic + "  ,tags : " + tags + " ,msg : " + msgBody);
                    //}
                }
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

    }
}
其中Consumer1订阅Tag 为TagA, 主题 Consumer2、 Consumer3订阅Tag 为TagB
public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        String group_name = "test_model_producer_name";
        DefaultMQProducer producer = new DefaultMQProducer(group_name);
        producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        producer.start();

        for (int i = 0; i < 8; i++) {
            try {
                String tag = (i % 2 == 0) ? "TagA" : "TagB";
                Message msg = new Message("test_model_topic",// topic
                        tag,// tag
                        ("信息内容" + i).getBytes()// body
                );
                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}
consumer1,2,3 先于Producer启动
Producer 运行输出:
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992CE10000, offsetMsgId=276A70F600002A9F0000000000004F6C, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=0], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D040001, offsetMsgId=276A70F600002A9F0000000000005022, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D0F0002, offsetMsgId=276A70F600002A9F00000000000050D8, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D180003, offsetMsgId=276A70F600002A9F000000000000518E, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=3], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D230004, offsetMsgId=276A70F600002A9F0000000000005244, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=0], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D2D0005, offsetMsgId=276A70F600002A9F00000000000052FA, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=1], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D3D0006, offsetMsgId=276A70F600002A9F00000000000053B0, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=2], queueOffset=3]
SendResult [sendStatus=SEND_OK, msgId=0A01017D353418B4AAC221992D470007, offsetMsgId=276A70F600002A9F0000000000005466, messageQueue=MessageQueue [topic=test_model_topic, brokerName=broker-a, queueId=3], queueOffset=3]
12:34:46.783 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.5.160:9876] result: true
12:34:46.785 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.112.246:10911] result: true
12:34:46.785 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.5.160:10911] result: true

Process finished with exit code 0

Consumer1,2,3示例运行输出
c1 start..
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容0
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容2
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容4
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容6
c2 start..
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容1
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容5
c3 start..
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容3
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容7
最佳实践:
假如生产队列是8个(queueId个数),那么消费者最好是2个、4个、8个(约数)

3. PushConsumer消费模式-广播模式

  • BROADCASTING模式(广播模式)
    同一个ConsumerGroup里的Consumer都消费订阅Topic全部信息
    也就是一条消息会被每一个Consumer消费
    setMessageModel方法

将 PushConsumer消费模式-集群模式的 Consumer1,2,3示例改为BROADCASTING模式(广播模式),同时按上述示例步骤运行

//            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.setMessageModel(MessageModel.BROADCASTING);
Consumer1,2,3示例运行输出
c1 start..
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容0
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容2
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容4
收到消息:  topic :test_model_topic  ,tags : TagA ,msg : 信息内容6
c2 start..
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容1
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容3
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容7
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容5
c3 start..
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容7
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容3
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容1
收到消息:  topic :test_model_topic  ,tags : TagB ,msg : 信息内容5
结论 :
通过c2、c3输出得出同一个ConsumerGroup里的Consumer都消费订阅Topic全部信息

4.消息存储核心-偏移量Offset

Offset是消息消费进度的核心
Offset指某个topic下的一条消息在某个MessageQueue里的位置
通过Offset可以进行定位到这条消息
Offset的存储实现分为远程文件类型和本地文件类型两种

5.集群模式-RemoteBrokerOffsetStore解析

默认集群模式Clustering,采用远程文件存储Offset
本质上因为多消费模式,每个Consumer消费所订阅主题的一部分

6.广播模式-LocalFileOffsetStore解析

广播模式下,由于每个Consumer都会收到消息且消费
各个Consumer之间没有任何干扰,独立线程消费
所以使用LocalFileOffsetStore,也就是把Offset存储到本地

7.消费者长轮询模式分析

DefaultPushConsumer是使用长轮询模式进行实现的
通常主流消息获取模式:Push消息推送模式&Pull消息拉取模式
长轮询机制

consumer 拉取消息,对应的 queue 如果没有数据,broker 不会立即返回,而是以一种长轮询的方式处理,把 PullReuqest 保存起来,等待 queue 有了消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest。

rocketMQ 长轮询

8.RocketMQ消费者-PullConsumer使用

  • 消息拉取方式:DefaultMQPullConsumer
  • Pull方式主要了三件事:
    获取Message Queue并遍历
    维护OffsetStore
    根据不同的消息状态做不同的处理
public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        String group_name = "test_pull_producer_name";
        DefaultMQProducer producer = new DefaultMQProducer(group_name);
        producer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        producer.start();

        for (int i = 0; i < 10; i++) {
            try {
                Message msg = new Message("test_pull_topic",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes()// body
                );

                SendResult sendResult = producer.send(msg);
                System.out.println(sendResult);
                Thread.sleep(1000);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(3000);
            }
        }

        producer.shutdown();
    }
}
public class PullConsumer {
    /**
     * Map<key, value>  key为指定的队列,value为这个队列拉取数据的最后位置
     * <p>
     * 重启,当前offset不会改变,因为offset没有存储到数据里,而只是存储到内存中
     */

    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {

        String group_name = "test_pull_consumer_name";
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group_name);
        consumer.setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);
        consumer.start();
        System.err.println("consumer start");
        //  从TopicTest这个主题去获取所有的队列(默认会有4个队列)
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("test_pull_topic");
        //  遍历每一个队列,进行拉取数据
        for (MessageQueue mq : mqs) {
            System.out.println("Consume from the queue: " + mq);

            SINGLE_MQ:
            while (true) {
                try {
                    //  从queue中获取数据,从什么位置开始拉取数据 单次对多拉取32条记录
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.println(pullResult);
                    System.out.println(pullResult.getPullStatus());
                    System.out.println();
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> list = pullResult.getMsgFoundList();
                            for (MessageExt msg : list) {
                                System.out.println(new String(msg.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            System.out.println("没有新的数据啦...");
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }


    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offseTable.put(mq, offset);
    }


    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offseTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }

}

先运行Producer再运行PullConsumer
Producer运行结果

SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0B3F10000, offsetMsgId=276A70F600002A9F0000000000000E60, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=2], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0B7F00001, offsetMsgId=276A70F600002A9F0000000000000F18, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=3], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0BBE50002, offsetMsgId=276A70F600002A9F0000000000000FD0, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=0], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0BFDB0003, offsetMsgId=276A70F600002A9F0000000000001088, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=1], queueOffset=5]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0C3DA0004, offsetMsgId=276A70F600002A9F0000000000001140, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=2], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0C7D50005, offsetMsgId=276A70F600002A9F00000000000011F8, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=3], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0CBD90006, offsetMsgId=276A70F600002A9F00000000000012B0, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=0], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0CFDA0007, offsetMsgId=276A70F600002A9F0000000000001368, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=1], queueOffset=6]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0D3D00008, offsetMsgId=276A70F600002A9F0000000000001420, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=2], queueOffset=7]
SendResult [sendStatus=SEND_OK, msgId=0A01017D29A018B4AAC222A0D7D00009, offsetMsgId=276A70F600002A9F00000000000014D8, messageQueue=MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=3], queueOffset=7]
17:22:47.425 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.5.160:9876] result: true
17:22:47.430 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.112.246:10911] result: true
17:22:47.431 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[39.106.5.160:10911] result: true

PullConsumer 运行结果

consumer start
Consume from the queue: MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=3]
PullResult [pullStatus=FOUND, nextBeginOffset=5, minOffset=0, maxOffset=5, msgFoundList=5]
FOUND

Hello RocketMQ 3
Hello RocketMQ 7
Hello RocketMQ 1
Hello RocketMQ 5
Hello RocketMQ 9
PullResult [pullStatus=NO_NEW_MSG, nextBeginOffset=5, minOffset=0, maxOffset=5, msgFoundList=0]
NO_NEW_MSG

没有新的数据啦...
Consume from the queue: MessageQueue [topic=test_pull_topic, brokerName=broker-a, queueId=0]
PullResult [pullStatus=FOUND, nextBeginOffset=5, minOffset=0, maxOffset=5, msgFoundList=5]
FOUND

Hello RocketMQ 0
Hello RocketMQ 4
Hello RocketMQ 8
Hello RocketMQ 2
Hello RocketMQ 6
PullResult [pullStatus=FOUND, nextBeginOffset=6, minOffset=0, maxOffset=6, msgFoundList=1]
FOUND
...省略
注意:重启,当前offset不会改变,因为offset没有存储到数据里,而只是存储到内存中
上述主动pull 消息问题:1. 没有同步远程更新消费offset 2.没有拉取消息的周期
优化
public class PullScheduleService {

    public static void main(String[] args) throws MQClientException {

        String group_name = "test_pull_consumer_name";

        final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService(group_name);

        scheduleService.getDefaultMQPullConsumer().setNamesrvAddr(Const.NAMESRV_ADDR_MASTER_SLAVE);

        scheduleService.setMessageModel(MessageModel.CLUSTERING);

        scheduleService.registerPullTaskCallback("test_pull_topic", new PullTaskCallback() {

            @Override
            public void doPullTask(MessageQueue mq, PullTaskContext context) {
                MQPullConsumer consumer = context.getPullConsumer();
                System.err.println("-------------- queueId: " + mq.getQueueId() + "-------------");
                try {
                    // 获取从哪里拉取
                    long offset = consumer.fetchConsumeOffset(mq, false);
                    if (offset < 0)
                        offset = 0;

                    PullResult pullResult = consumer.pull(mq, "*", offset, 32);
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> list = pullResult.getMsgFoundList();
                            for (MessageExt msg : list) {
                                //消费数据...
                                System.out.println(new String(msg.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                    //更新ConsumeOffset 消费记录
                    consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
                    // 设置再过3000ms后重新拉取
                    context.setPullNextDelayTimeMillis(3000);

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        scheduleService.start();
    }
}

-------------- queueId: 3-------------
-------------- queueId: 0-------------
-------------- queueId: 2-------------
-------------- queueId: 1-------------
Hello RocketMQ 2
Hello RocketMQ 6
Hello RocketMQ 0
Hello RocketMQ 4
Hello RocketMQ 8
Hello RocketMQ 1
Hello RocketMQ 3
Hello RocketMQ 0
Hello RocketMQ 4
Hello RocketMQ 8
Hello RocketMQ 2
Hello RocketMQ 6
Hello RocketMQ 2
Hello RocketMQ 6
Hello RocketMQ 7
Hello RocketMQ 5
Hello RocketMQ 0
Hello RocketMQ 4
Hello RocketMQ 9
Hello RocketMQ 1
Hello RocketMQ 3
Hello RocketMQ 7
Hello RocketMQ 3
Hello RocketMQ 7
Hello RocketMQ 8
Hello RocketMQ 5
Hello RocketMQ 9
Hello RocketMQ 1
Hello RocketMQ 5
Hello RocketMQ 9
-------------- queueId: 0-------------
-------------- queueId: 3-------------
-------------- queueId: 2-------------
-------------- queueId: 1-------------
-------------- queueId: 0-------------
...省略

特别感谢:
阿神

相关文章

网友评论

      本文标题:RocketMQ 5.消费者核心应用

      本文链接:https://www.haomeiwen.com/subject/xqjooktx.html