美文网首页
消费消息(一)

消费消息(一)

作者: isuntong | 来源:发表于2020-02-24 13:49 被阅读0次
  1. 创建消费者Consumer,制定消费者组名
  2. 自定NameServer地址
  3. 订阅主题Topic和Tag
  4. 设置回调参数,处理消息
  5. 启动消费者consumer

消费

package com.suntong.myshop.service.rocketmq.consumer.moren;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 消息的接收者
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
//        1. 创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//        2. 自定NameServer地址
        consumer.setNamesrvAddr("192.168.79.130:9876");
//        3. 订阅主题Topic和Tag
        consumer.subscribe("base","Tag1");
//        4. 设置回调参数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            /**
             * 接收消息内容
             * @param list
             * @param consumeConcurrentlyContext
             * @return
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println(list);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
//        5. 启动消费者consumer
        consumer.start();
    }
}

打印

[MessageExt [queueId=2, storeSize=169, queueOffset=2, sysFlag=0, bornTimestamp=1582520637385, bornHost=/192.168.79.1:64315, storeTimestamp=1582520636811, storeHost=/192.168.79.130:10911, msgId=C0A84F8200002A9F00000000000005F1, commitLogOffset=1521, bodyCRC=149520835, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='base', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1582522478203, UNIQ_KEY=C0A82B9C26A418B4AAC2793FFFC90009, WAIT=true, TAGS=Tag1}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 57], transactionId='null'}]]
[MessageExt [queueId=2, storeSize=169, queueOffset=0, sysFlag=0, bornTimestamp=1582520629351, bornHost=/192.168.79.1:64315, storeTimestamp=1582520628777, storeHost=/192.168.79.130:10911, msgId=C0A84F8200002A9F00000000000000A9, commitLogOffset=169, bodyCRC=103942641, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='base', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1582522478203, UNIQ_KEY=C0A82B9C26A418B4AAC2793FE0670001, WAIT=true, TAGS=Tag1}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 49], transactionId='null'}]]
[MessageExt [queueId=0, storeSize=169, queueOffset=1, sysFlag=0, bornTimestamp=1582520635375, bornHost=/192.168.79.1:64315, storeTimestamp=1582520634801, storeHost=/192.168.79.130:10911, msgId=C0A84F8200002A9F000000000000049F, commitLogOffset=1183, bodyCRC=1867623620, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='base', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1582522478203, UNIQ_KEY=C0A82B9C26A418B4AAC2793FF7EF0007, WAIT=true, TAGS=Tag1}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 55], transactionId='null'}]]
[MessageExt [queueId=0, storeSize=169, queueOffset=0, sysFlag=0, bornTimestamp=1582520631358, bornHost=/192.168.79.1:64315, storeTimestamp=1582520630784, storeHost=/192.168.79.130:10911, msgId=C0A84F8200002A9F00000000000001FB, commitLogOffset=507, bodyCRC=1748789469, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='base', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1582522478203, UNIQ_KEY=C0A82B9C26A418B4AAC2793FE83E0003, WAIT=true, TAGS=Tag1}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 51], transactionId='null'}]]
[MessageExt [queueId=1, storeSize=169, queueOffset=1, sysFlag=0, bornTimestamp=1582520632362, bornHost=/192.168.79.1:64315, storeTimestamp=1582520631788, storeHost=/192.168.79.130:10911, msgId=C0A84F8200002A9F00000000000002A4, commitLogOffset=676, bodyCRC=1985543550, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='base', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1582522478203, UNIQ_KEY=C0A82B9C26A418B4AAC2793FEC2A0004, WAIT=true, TAGS=Tag1}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 52], transactionId='null'}]]
[MessageExt [queueId=1, storeSize=169, queueOffset=2, sysFlag=0, bornTimestamp=1582520636381, bornHost=/192.168.79.1:64315, storeTimestamp=1582520635807, storeHost=/192.168.79.130:10911, msgId=C0A84F8200002A9F0000000000000548, commitLogOffset=1352, bodyCRC=2146349397, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='base', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1582522478203, UNIQ_KEY=C0A82B9C26A418B4AAC2793FFBDD0008, WAIT=true, TAGS=Tag1}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 56], transactionId='null'}]]
[MessageExt [queueId=3, storeSize=169, queueOffset=1, sysFlag=0, bornTimestamp=1582520634370, bornHost=/192.168.79.1:64315, storeTimestamp=1582520633796, storeHost=/192.168.79.130:10911, msgId=C0A84F8200002A9F00000000000003F6, commitLogOffset=1014, bodyCRC=408329298, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='base', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1582522478205, UNIQ_KEY=C0A82B9C26A418B4AAC2793FF4020006, WAIT=true, TAGS=Tag1}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 54], transactionId='null'}]]
[MessageExt [queueId=1, storeSize=169, queueOffset=0, sysFlag=0, bornTimestamp=1582520628325, bornHost=/192.168.79.1:64315, storeTimestamp=1582520627763, storeHost=/192.168.79.130:10911, msgId=C0A84F8200002A9F0000000000000000, commitLogOffset=0, bodyCRC=1899313511, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='base', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1582522478205, UNIQ_KEY=C0A82B9C26A418B4AAC2793FDC640000, WAIT=true, TAGS=Tag1}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 48], transactionId='null'}]]
[MessageExt [queueId=3, storeSize=169, queueOffset=0, sysFlag=0, bornTimestamp=1582520630354, bornHost=/192.168.79.1:64315, storeTimestamp=1582520629779, storeHost=/192.168.79.130:10911, msgId=C0A84F8200002A9F0000000000000152, commitLogOffset=338, bodyCRC=523982923, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='base', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=6, CONSUME_START_TIME=1582522478206, UNIQ_KEY=C0A82B9C26A418B4AAC2793FE4520002, WAIT=true, TAGS=Tag1}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 50], transactionId='null'}]]
[MessageExt [queueId=2, storeSize=169, queueOffset=1, sysFlag=0, bornTimestamp=1582520633367, bornHost=/192.168.79.1:64315, storeTimestamp=1582520632792, storeHost=/192.168.79.130:10911, msgId=C0A84F8200002A9F000000000000034D, commitLogOffset=845, bodyCRC=23055848, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='base', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=9, CONSUME_START_TIME=1582522478206, UNIQ_KEY=C0A82B9C26A418B4AAC2793FF0170005, WAIT=true, TAGS=Tag1}, body=[72, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 53], transactionId='null'}]]

消息还在

消费一下异步消息

package com.suntong.myshop.service.rocketmq.consumer.moren;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 消息的接收者
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
//        1. 创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//        2. 自定NameServer地址
        consumer.setNamesrvAddr("192.168.79.130:9876");
//        3. 订阅主题Topic和Tag
        consumer.subscribe("base","Tag2");
//        4. 设置回调参数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            /**
             * 接收消息内容
             * @param list
             * @param consumeConcurrentlyContext
             * @return
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg: list) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
//        5. 启动消费者consumer
        consumer.start();
    }
}

启动了没反应,再发送一下异步消息

收到了

E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=64936:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-consumer\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.consumer.moren.Consumer
Hello world2
Hello world1
Hello world0
Hello world3
Hello world4
Hello world5
Hello world6
Hello world7
Hello world8
Hello world9

为什么这样?。。。。。。。。。。

消费者广播模式和负载均衡模式

负载均衡模式

Tag1 || Tag2:消费两种标签
*:消费所有标签

启动两个consum看默认是广播还是负载均衡

启动同步模式提供者

所以模式:负载均衡

//设定消费模式:负载均衡 | 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);

都消费了

顺序消费

消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。

顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

下面用订单进行分区有序的示例。一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

一个broker中有多个队列,消息轮询进入每个队列,然后consumer采用多线程消费,无法保证也是同样的顺序消费掉消息

消息顺序:
全局消息顺序
局部消息顺序

所以张三和李四谁先发没问题,只要保证每个人的消息顺序一样

每个人的的消息防弹一个队列,然后只让一个线程去消费一个队列

生产消息

package com.suntong.myshop.service.rocketmq.provider.order;

import java.util.ArrayList;
import java.util.List;

public class OrderStep {
    private long orderId;
    private String desc;

    public long getOrderId() {
        return orderId;
    }

    public void setOrderId(long orderId) {
        this.orderId = orderId;
    }

    public String getDesc() {
        return desc;
    }

    public void setDesc(String desc) {
        this.desc = desc;
    }

    @Override
    public String toString() {
        return "OrderStep{" +
                "orderId=" + orderId +
                ", desc='" + desc + '\'' +
                '}';
    }


    /**
     * 生成模拟订单数据
     */
    public static List<OrderStep> buildOrders() {
        List<OrderStep> orderList = new ArrayList<OrderStep>();

        OrderStep orderDemo = new OrderStep();
        orderDemo.setOrderId(1111L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(2222L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1111L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(3333L);
        orderDemo.setDesc("创建");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(2222L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(3333L);
        orderDemo.setDesc("付款");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(2222L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1111L);
        orderDemo.setDesc("推送");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(3333L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        orderDemo = new OrderStep();
        orderDemo.setOrderId(1111L);
        orderDemo.setDesc("完成");
        orderList.add(orderDemo);

        return orderList;
    }
}

生产者

package com.suntong.myshop.service.rocketmq.provider.order;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.List;

public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        //        1. 创建消息生产者Producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2. 指定NameServer
        producer.setNamesrvAddr("192.168.79.130:9876");
//        3. 启动producer
        producer.start();
        //构建消息集合
        List<OrderStep> orderSteps = OrderStep.buildOrders();
        //发送消息
        for(int i=0;i<orderSteps.size();i++){
            String body = orderSteps.get(i)+"";
            //keys 为消息里的关键字,写啥都行
            Message message = new Message("OrderTopic","Order","i"+i,body.getBytes());
            /**
             * 参数一:消息对象
             * 参数二:消息队列选择器
             * 参数三:选择队列的业务标识,会传到消息队列选择器的 Object 参数中
             */
            SendResult sendResult = producer.send(message, new MessageQueueSelector() {
                /**
                 *
                 * @param list 队列集合
                 * @param message 消息对象
                 * @param o 业务标识参数
                 * @return
                 */
                @Override
                public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
                    //拿到消息的 OrderId
                    long orderId = (long)o;
                    long index = orderId % list.size();
                    return list.get((int)index);
                }
            }, orderSteps.get(i).getOrderId());

            System.out.println("发送结果: "+sendResult);
        }

        producer.shutdown();
    }
}

消费者

package com.suntong.myshop.service.rocketmq.provider.order;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
//        1. 创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//        2. 自定NameServer地址
        consumer.setNamesrvAddr("192.168.79.130:9876");
//        3. 订阅主题Topic和Tag
        consumer.subscribe("OrderTopic","*");

        //注册消息监听器
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                for (MessageExt msg : list) {
                    System.out.println("线程名称: "+ Thread.currentThread().getName() +", 消费消息:" + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        //启动消费者
        consumer.start();

        System.out.println("消费者启动");
    }
}

先启动消费者

再启动生产者

生产者打印

E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=53999:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-provider\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.provider.order.Producer
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C112C18B4AAC27AC140440000, offsetMsgId=C0A84F8200002A9F0000000000001E14, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=3], queueOffset=14]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C112C18B4AAC27AC140510001, offsetMsgId=C0A84F8200002A9F0000000000001EE6, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=0]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C112C18B4AAC27AC140540002, offsetMsgId=C0A84F8200002A9F0000000000001FB8, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=3], queueOffset=15]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C112C18B4AAC27AC140580003, offsetMsgId=C0A84F8200002A9F000000000000208A, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=6]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C112C18B4AAC27AC1405A0004, offsetMsgId=C0A84F8200002A9F000000000000215C, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=1]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C112C18B4AAC27AC1405D0005, offsetMsgId=C0A84F8200002A9F000000000000222E, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=7]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C112C18B4AAC27AC140600006, offsetMsgId=C0A84F8200002A9F0000000000002300, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=2], queueOffset=2]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C112C18B4AAC27AC140630007, offsetMsgId=C0A84F8200002A9F00000000000023D2, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=3], queueOffset=16]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C112C18B4AAC27AC140660008, offsetMsgId=C0A84F8200002A9F00000000000024A4, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=1], queueOffset=8]
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C112C18B4AAC27AC140690009, offsetMsgId=C0A84F8200002A9F0000000000002576, messageQueue=MessageQueue [topic=OrderTopic, brokerName=broker-a, queueId=3], queueOffset=17]
20:04:45.308 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10909] result: true
20:04:45.312 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:9876] result: true
20:04:45.312 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10911] result: true

消费者打印

E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=53982:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-provider\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.provider.order.Consumer
消费者启动
线程名称: ConsumeMessageThread_3, 消费消息:OrderStep{orderId=2222, desc='创建'}
线程名称: ConsumeMessageThread_2, 消费消息:OrderStep{orderId=1111, desc='创建'}
线程名称: ConsumeMessageThread_1, 消费消息:OrderStep{orderId=3333, desc='创建'}
线程名称: ConsumeMessageThread_2, 消费消息:OrderStep{orderId=1111, desc='付款'}
线程名称: ConsumeMessageThread_2, 消费消息:OrderStep{orderId=1111, desc='推送'}
线程名称: ConsumeMessageThread_1, 消费消息:OrderStep{orderId=3333, desc='付款'}
线程名称: ConsumeMessageThread_3, 消费消息:OrderStep{orderId=2222, desc='付款'}
线程名称: ConsumeMessageThread_1, 消费消息:OrderStep{orderId=3333, desc='完成'}
线程名称: ConsumeMessageThread_3, 消费消息:OrderStep{orderId=2222, desc='完成'}
线程名称: ConsumeMessageThread_2, 消费消息:OrderStep{orderId=1111, desc='完成'}

看似很完美,但是

此时如果不关闭消费者,再次启动生产者

消费者打印(包含之前的)

E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=53982:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-provider\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.provider.order.Consumer
消费者启动
线程名称: ConsumeMessageThread_3, 消费消息:OrderStep{orderId=2222, desc='创建'}
线程名称: ConsumeMessageThread_2, 消费消息:OrderStep{orderId=1111, desc='创建'}
线程名称: ConsumeMessageThread_1, 消费消息:OrderStep{orderId=3333, desc='创建'}
线程名称: ConsumeMessageThread_2, 消费消息:OrderStep{orderId=1111, desc='付款'}
线程名称: ConsumeMessageThread_2, 消费消息:OrderStep{orderId=1111, desc='推送'}
线程名称: ConsumeMessageThread_1, 消费消息:OrderStep{orderId=3333, desc='付款'}
线程名称: ConsumeMessageThread_3, 消费消息:OrderStep{orderId=2222, desc='付款'}
线程名称: ConsumeMessageThread_1, 消费消息:OrderStep{orderId=3333, desc='完成'}
线程名称: ConsumeMessageThread_3, 消费消息:OrderStep{orderId=2222, desc='完成'}
线程名称: ConsumeMessageThread_2, 消费消息:OrderStep{orderId=1111, desc='完成'}
线程名称: ConsumeMessageThread_4, 消费消息:OrderStep{orderId=1111, desc='创建'}
线程名称: ConsumeMessageThread_5, 消费消息:OrderStep{orderId=2222, desc='创建'}
线程名称: ConsumeMessageThread_6, 消费消息:OrderStep{orderId=1111, desc='付款'}
线程名称: ConsumeMessageThread_7, 消费消息:OrderStep{orderId=3333, desc='创建'}
线程名称: ConsumeMessageThread_8, 消费消息:OrderStep{orderId=2222, desc='付款'}
线程名称: ConsumeMessageThread_9, 消费消息:OrderStep{orderId=3333, desc='付款'}
线程名称: ConsumeMessageThread_10, 消费消息:OrderStep{orderId=2222, desc='完成'}
线程名称: ConsumeMessageThread_11, 消费消息:OrderStep{orderId=1111, desc='推送'}
线程名称: ConsumeMessageThread_12, 消费消息:OrderStep{orderId=3333, desc='完成'}
线程名称: ConsumeMessageThread_13, 消费消息:OrderStep{orderId=1111, desc='完成'}

每一个线程去处理一个消息了

顺序还能保持,不知道实际情况怎么搞。。。

延时消息

比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

提供者

package com.suntong.myshop.service.rocketmq.provider.delay;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.concurrent.TimeUnit;

public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1. 创建消息生产者Producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2. 指定NameServer
        producer.setNamesrvAddr("192.168.79.130:9876");
//        3. 启动producer
        producer.start();
//        4. 创建消息对象,指定主题Topic,Tag和消息体
        for (int i=0;i<10;i++){
            //参数一:消息主题topic
            //参数二:消息tag
            //参数三:消息内容
            Message message = new Message("DelayTopic","Tag1",("Hello world"+i).getBytes());

            //设置延时时间
            message.setDelayTimeLevel(2);

            //        5. 发送消息
            SendResult result = producer.send(message);

            System.out.println("发送结果: "+ result);

            //睡一秒再发送下一个
            TimeUnit.SECONDS.sleep(1);
        }
//        6. 关闭生产者producer
        producer.shutdown();
    }
}

消费者

package com.suntong.myshop.service.rocketmq.provider.delay;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
//        1. 创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//        2. 自定NameServer地址
        consumer.setNamesrvAddr("192.168.79.130:9876");
//        3. 订阅主题Topic和Tag
        consumer.subscribe("DelayTopic","*");

//        4. 设置回调参数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            /**
             * 接收消息内容
             * @param list
             * @param consumeConcurrentlyContext
             * @return
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg: list) {
                    System.out.println("消息Id:[" + msg.getMsgId() +"], 延时时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
//        5. 启动消费者consumer
        consumer.start();
    }
}

consumer打印

E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=55031:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-provider\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.provider.delay.Consumer
消息Id:[C0A82B9C385818B4AAC27B0752C30004], 延时时间:4625
消息Id:[C0A82B9C385818B4AAC27B074AEC0002], 延时时间:6630
消息Id:[C0A82B9C385818B4AAC27B0743040000], 延时时间:8640
消息Id:[C0A82B9C385818B4AAC27B075A9A0006], 延时时间:2618
消息Id:[C0A82B9C385818B4AAC27B0762720008], 延时时间:992
消息Id:[C0A82B9C385818B4AAC27B074ED70003], 延时时间:8014
消息Id:[C0A82B9C385818B4AAC27B0756AE0005], 延时时间:6008
消息Id:[C0A82B9C385818B4AAC27B0747000001], 延时时间:10022
消息Id:[C0A82B9C385818B4AAC27B075E850007], 延时时间:4000
消息Id:[C0A82B9C385818B4AAC27B07665C0009], 延时时间:1994

messageDelayLevel = 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

0标识无延时,1标识延时1s

由于网络不稳定,时间打印出来也是变化的,我们注释掉延时再试试

打印

E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=55201:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-provider\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.provider.delay.Consumer
消息Id:[C0A82B9C14C018B4AAC27B0CB9C70000], 延时时间:1624
消息Id:[C0A82B9C14C018B4AAC27B0CBDBD0001], 延时时间:997
消息Id:[C0A82B9C14C018B4AAC27B0CC1A90002], 延时时间:997
消息Id:[C0A82B9C14C018B4AAC27B0CC5950003], 延时时间:998
消息Id:[C0A82B9C14C018B4AAC27B0CC9800004], 延时时间:997
消息Id:[C0A82B9C14C018B4AAC27B0CCD6C0005], 延时时间:996
消息Id:[C0A82B9C14C018B4AAC27B0CD1570006], 延时时间:996
消息Id:[C0A82B9C14C018B4AAC27B0CD5420007], 延时时间:998
消息Id:[C0A82B9C14C018B4AAC27B0CD92E0008], 延时时间:998
消息Id:[C0A82B9C14C018B4AAC27B0CDD190009], 延时时间:999

网络大概有一秒延时

相关文章

  • 消费消息(一)

    创建消费者Consumer,制定消费者组名 自定NameServer地址 订阅主题Topic和Tag 设置回调参数...

  • 消息消费

    总览 消息消费分为两种形式并发消费、顺序消费;这次主要讲并发消费。 消息从Broker拉取到客户端之后,等待客户端...

  • 消息队列的消费语义和投递语义

    消费语义 如何保证消息最多消费一次 如何保证消息至少消费一次 如何保证消息恰好消费一次 投递语义 如何保证消息最多...

  • RocketMQ基础篇 Consumer消费消息

    消费消息逻辑 消费消息逻辑主要分为三个模块 Rebalance 拉取消息 消费消息 Rebalance 集群模式下...

  • RocketMQ消息重试

    RocketMQ为了保证消息被消费采用ACK确认机制,消费者消费消息时需要给Broker反馈消息消费的情况,成功或...

  • Kafka 消费者心跳线程源码解析

    kafka消费者在消费消息时,分为心跳线程和用户线程(处理消息的线程) 消费消息poll方法 我们在第一次启动消费...

  • RocketMQ-概念模型

    Producer消息生产者,负责产生消息,一般由业务系统负责产生消息。 Consumer消息消费者,负责消费消息,...

  • pulsar十分钟入门

    什么是消息队列 点对点:每个消息只有一个消费者,只有一个消费者可以消费此消息,消费完就可以删除。 发布\订阅:发送...

  • 消费消息(二)

    批量消息发送 批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitSt...

  • RocketMQ之消息重试与消息重投

    2020-02-25 消息重试 Consumer消费消息失败后,令消息再消费一次。当消费者主动返回重试码RECON...

网友评论

      本文标题:消费消息(一)

      本文链接:https://www.haomeiwen.com/subject/jbxvqhtx.html