批量消息发送
批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。
producer
package com.suntong.myshop.service.rocketmq.provider.batch;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1. 创建消息生产者Producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2. 指定NameServer
producer.setNamesrvAddr("192.168.79.130:9876");
// 3. 启动producer
producer.start();
// 4. 创建消息对象,指定主题Topic,Tag和消息体
List<Message> messages = new ArrayList<>();
//参数一:消息主题topic
//参数二:消息tag
//参数三:消息内容
Message message1 = new Message("BatchTopic","Tag1",("Hello world"+1).getBytes());
Message message2 = new Message("BatchTopic","Tag1",("Hello world"+2).getBytes());
Message message3 = new Message("BatchTopic","Tag1",("Hello world"+3).getBytes());
messages.add(message1);
messages.add(message2);
messages.add(message3);
// 5. 发送消息
SendResult result = producer.send(messages);
System.out.println("发送结果: "+ result);
//睡一秒再发送下一个
TimeUnit.SECONDS.sleep(1);
// 6. 关闭生产者producer
producer.shutdown();
}
}
consumer没变化
package com.suntong.myshop.service.rocketmq.provider.batch;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 1. 创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 2. 自定NameServer地址
consumer.setNamesrvAddr("192.168.79.130:9876");
// 3. 订阅主题Topic和Tag
consumer.subscribe("BatchTopic","*");
// 4. 设置回调参数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 接收消息内容
* @param list
* @param consumeConcurrentlyContext
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg: list) {
System.out.println("消息Id:[" + msg.getMsgId() +"], 延时时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5. 启动消费者consumer
consumer.start();
System.out.println("消费者启动");
}
}
打印
producer
E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=55356:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-provider\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.provider.batch.Producer
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C393418B4AAC27B142A420000,C0A82B9C393418B4AAC27B142A420001,C0A82B9C393418B4AAC27B142A420002, offsetMsgId=C0A84F8200002A9F000000000000468C,C0A84F8200002A9F000000000000473B,C0A84F8200002A9F00000000000047EA, messageQueue=MessageQueue [topic=BatchTopic, brokerName=broker-a, queueId=1], queueOffset=0]
21:35:21.071 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10911] result: true
21:35:21.075 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10909] result: true
21:35:21.076 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:9876] result: true
consumer打印
E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=55342:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-provider\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.provider.batch.Consumer
消费者启动
消息Id:[C0A82B9C393418B4AAC27B142A420000], 延时时间:28976
消息Id:[C0A82B9C393418B4AAC27B142A420002], 延时时间:28976
消息Id:[C0A82B9C393418B4AAC27B142A420001], 延时时间:28976
超过4M时最好把消息进行分割
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; // 增加日志的开销20字节
if (tmpSize > SIZE_LIMIT) {
//单个消息超过了最大的限制
//忽略,否则会阻塞分裂的进程
if (nextIndex - currIndex == 0) {
//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//处理error
}
}
过滤消息
在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 10 | --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message |
|----------| a > 5 AND b = 'abc'
| a = 1 | --------------------> Missed
| b = 'abc'|
| c = true |
------------
没有太大的技术性
就是Consumer使用 Tag1 || Tag2
可以消费两种Tag
*
可以消费所有
SQL基本语法过滤
Producer 设置
message.putUserProperty()
启动出错,需要打开sql过滤设置
![](https://img.haomeiwen.com/i17476301/7387a7f8a8dd40c4.png)
再配置文件broker.conf
中加入enablePropertyFilter=true
此时成功启动并消费
producer
package com.suntong.myshop.service.rocketmq.provider.filter.sql;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1. 创建消息生产者Producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
// 2. 指定NameServer
producer.setNamesrvAddr("192.168.79.130:9876");
// 3. 启动producer
producer.start();
// 4. 创建消息对象,指定主题Topic,Tag和消息体
for (int i=0;i<10;i++){
//参数一:消息主题topic
//参数二:消息tag
//参数三:消息内容
Message message = new Message("FilterSqlTopic","Tag1",("Hello world"+i).getBytes());
message.putUserProperty("i",String.valueOf(i));
// 5. 发送消息
SendResult result = producer.send(message);
System.out.println("发送结果: "+ result);
//睡一秒再发送下一个
TimeUnit.SECONDS.sleep(1);
}
// 6. 关闭生产者producer
producer.shutdown();
}
}
consumer 本来订阅subscribe的地方的tag变成了sql
package com.suntong.myshop.service.rocketmq.provider.filter.sql;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 1. 创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 2. 自定NameServer地址
consumer.setNamesrvAddr("192.168.79.130:9876");
// 3. 订阅主题Topic和Tag
consumer.subscribe("FilterSqlTopic", MessageSelector.bySql("i>5"));
// 4. 设置回调参数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 接收消息内容
* @param list
* @param consumeConcurrentlyContext
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg: list) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5. 启动消费者consumer
consumer.start();
System.out.println("消费者启动");
}
}
![](https://img.haomeiwen.com/i17476301/556a0c351011156f.png)
RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
- 数值比较,比如:>,>=,<,<=,BETWEEN,=;
- 字符比较,比如:=,<>,IN;
- IS NULL 或者 IS NOT NULL;
- 逻辑符号 AND,OR,NOT;
常量支持类型为:
- 数值,比如:123,3.1415;
- 字符,比如:'abc',必须用单引号包裹起来;
- NULL,特殊的常量
- 布尔值,TRUE 或 FALSE
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
事务消息
![](https://img.haomeiwen.com/i17476301/7dce4922f610789a.png)
1)事务消息发送及提交
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2)事务补偿
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
3)事务消息状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
- TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
- TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
- TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
生产者搞两个断点
![](https://img.haomeiwen.com/i17476301/cbbab30f2f33f462.png)
![](https://img.haomeiwen.com/i17476301/2b930ccf7b202601.png)
producer
package com.suntong.myshop.service.rocketmq.provider.transaction;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.util.concurrent.TimeUnit;
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
// 1. 创建消息生产者Producer,并制定生产者组名,现在时事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("group5");
// 2. 指定NameServer
producer.setNamesrvAddr("192.168.79.130:9876");
//添加事务监听器
producer.setTransactionListener(new TransactionListener() {
/**
* 该方法中执行本地事务
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
//提交 回滚 不做处理(会回查,下一个方法)
if(StringUtils.equals("TagA",message.getTags())){
return LocalTransactionState.COMMIT_MESSAGE;
}
if(StringUtils.equals("TagB",message.getTags())){
return LocalTransactionState.ROLLBACK_MESSAGE;
}
if(StringUtils.equals("TagC",message.getTags())){
return LocalTransactionState.UNKNOW;
}
return LocalTransactionState.UNKNOW;
}
/**
* 该方法时mq进行消息事务状态的回查
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("开始回查");
System.out.println("消息Tag:" + messageExt.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
});
// 3. 启动producer
producer.start();
//区分不同的消息 成功 失败 回查
String[] tags = {"TagA","TagB","TagC"};
// 4. 创建消息对象,指定主题Topic,Tag和消息体
for (int i=0;i<3;i++){
//参数一:消息主题topic
//参数二:消息tag
//参数三:消息内容
Message message = new Message("TransactionTopic",tags[i],("Hello world"+i).getBytes());
// 5. 发送消息 不针对某一事务消息进行控制,传入null
SendResult result = producer.sendMessageInTransaction(message, null);
System.out.println("发送结果: "+ result);
//睡一秒再发送下一个
TimeUnit.SECONDS.sleep(1);
}
//
//producer.shutdown();
}
}
consumer 就修改了一下topic
package com.suntong.myshop.service.rocketmq.provider.transaction;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* 消息的接收者
*/
public class Consumer {
public static void main(String[] args) throws MQClientException {
// 1. 创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
// 2. 自定NameServer地址
consumer.setNamesrvAddr("192.168.79.130:9876");
// 3. 订阅主题Topic和Tag
consumer.subscribe("TransactionTopic","*");
//设定消费模式:负载均衡 | 广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 4. 设置回调参数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
/**
* 接收消息内容
* @param list
* @param consumeConcurrentlyContext
* @return
*/
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg: list) {
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 5. 启动消费者consumer
consumer.start();
System.out.println("消费者启动");
}
}
正常应该只有AC两个能发过去,C会回查,结果ABC都发过去了,还都回查了,难搞。。。
网友评论