美文网首页
消费消息(二)

消费消息(二)

作者: isuntong | 来源:发表于2020-02-24 23:28 被阅读0次

批量消息发送

批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB。

producer

package com.suntong.myshop.service.rocketmq.provider.batch;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1. 创建消息生产者Producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2. 指定NameServer
        producer.setNamesrvAddr("192.168.79.130:9876");
//        3. 启动producer
        producer.start();
//        4. 创建消息对象,指定主题Topic,Tag和消息体

        List<Message> messages = new ArrayList<>();

        //参数一:消息主题topic
        //参数二:消息tag
        //参数三:消息内容
        Message message1 = new Message("BatchTopic","Tag1",("Hello world"+1).getBytes());
        Message message2 = new Message("BatchTopic","Tag1",("Hello world"+2).getBytes());
        Message message3 = new Message("BatchTopic","Tag1",("Hello world"+3).getBytes());

        messages.add(message1);
        messages.add(message2);
        messages.add(message3);

        //        5. 发送消息
        SendResult result = producer.send(messages);

        System.out.println("发送结果: "+ result);

        //睡一秒再发送下一个
        TimeUnit.SECONDS.sleep(1);

//        6. 关闭生产者producer
        producer.shutdown();
    }
}

consumer没变化

package com.suntong.myshop.service.rocketmq.provider.batch;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
//        1. 创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//        2. 自定NameServer地址
        consumer.setNamesrvAddr("192.168.79.130:9876");
//        3. 订阅主题Topic和Tag
        consumer.subscribe("BatchTopic","*");

//        4. 设置回调参数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            /**
             * 接收消息内容
             * @param list
             * @param consumeConcurrentlyContext
             * @return
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg: list) {
                    System.out.println("消息Id:[" + msg.getMsgId() +"], 延时时间:" + (System.currentTimeMillis() - msg.getStoreTimestamp()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
//        5. 启动消费者consumer
        consumer.start();

        System.out.println("消费者启动");
    }
}

打印
producer

E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=55356:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-provider\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.provider.batch.Producer
发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C393418B4AAC27B142A420000,C0A82B9C393418B4AAC27B142A420001,C0A82B9C393418B4AAC27B142A420002, offsetMsgId=C0A84F8200002A9F000000000000468C,C0A84F8200002A9F000000000000473B,C0A84F8200002A9F00000000000047EA, messageQueue=MessageQueue [topic=BatchTopic, brokerName=broker-a, queueId=1], queueOffset=0]
21:35:21.071 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10911] result: true
21:35:21.075 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10909] result: true
21:35:21.076 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:9876] result: true

consumer打印

E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=55342:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-provider\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.provider.batch.Consumer
消费者启动
消息Id:[C0A82B9C393418B4AAC27B142A420000], 延时时间:28976
消息Id:[C0A82B9C393418B4AAC27B142A420002], 延时时间:28976
消息Id:[C0A82B9C393418B4AAC27B142A420001], 延时时间:28976

超过4M时最好把消息进行分割

如果消息的总长度可能大于4MB时,这时候最好把消息进行分割

public class ListSplitter implements Iterator<List<Message>> {
   private final int SIZE_LIMIT = 1024 * 1024 * 4;
   private final List<Message> messages;
   private int currIndex;
   public ListSplitter(List<Message> messages) {
           this.messages = messages;
   }
    @Override 
    public boolean hasNext() {
       return currIndex < messages.size();
   }
    @Override 
    public List<Message> next() {
       int nextIndex = currIndex;
       int totalSize = 0;
       for (; nextIndex < messages.size(); nextIndex++) {
           Message message = messages.get(nextIndex);
           int tmpSize = message.getTopic().length() + message.getBody().length;
           Map<String, String> properties = message.getProperties();
           for (Map.Entry<String, String> entry : properties.entrySet()) {
               tmpSize += entry.getKey().length() + entry.getValue().length();
           }
           tmpSize = tmpSize + 20; // 增加日志的开销20字节
           if (tmpSize > SIZE_LIMIT) {
               //单个消息超过了最大的限制
               //忽略,否则会阻塞分裂的进程
               if (nextIndex - currIndex == 0) {
                  //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                  nextIndex++;
               }
               break;
           }
           if (tmpSize + totalSize > SIZE_LIMIT) {
               break;
           } else {
               totalSize += tmpSize;
           }

       }
       List<Message> subList = messages.subList(currIndex, nextIndex);
       currIndex = nextIndex;
       return subList;
   }
}
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
  try {
      List<Message>  listItem = splitter.next();
      producer.send(listItem);
  } catch (Exception e) {
      e.printStackTrace();
      //处理error
  }
}

过滤消息

在大多数情况下,TAG是一个简单而有用的设计,其可以来选择您想要的消息。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将接收包含TAGA或TAGB或TAGC的消息。但是限制是一个消息只能有一个标签,这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。SQL特性可以通过发送消息时的属性来进行计算。在RocketMQ定义的语法下,可以实现一些简单的逻辑。下面是一个例子:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

没有太大的技术性

就是Consumer使用 Tag1 || Tag2可以消费两种Tag
*可以消费所有

SQL基本语法过滤

Producer 设置

message.putUserProperty()

启动出错,需要打开sql过滤设置

再配置文件broker.conf中加入enablePropertyFilter=true

此时成功启动并消费

producer

package com.suntong.myshop.service.rocketmq.provider.filter.sql;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.concurrent.TimeUnit;

public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1. 创建消息生产者Producer,并制定生产者组名
        DefaultMQProducer producer = new DefaultMQProducer("group1");
//        2. 指定NameServer
        producer.setNamesrvAddr("192.168.79.130:9876");
//        3. 启动producer
        producer.start();
//        4. 创建消息对象,指定主题Topic,Tag和消息体
        for (int i=0;i<10;i++){
            //参数一:消息主题topic
            //参数二:消息tag
            //参数三:消息内容
            Message message = new Message("FilterSqlTopic","Tag1",("Hello world"+i).getBytes());

            message.putUserProperty("i",String.valueOf(i));

            //        5. 发送消息
            SendResult result = producer.send(message);

            System.out.println("发送结果: "+ result);

            //睡一秒再发送下一个
            TimeUnit.SECONDS.sleep(1);
        }
//        6. 关闭生产者producer
        producer.shutdown();
    }
}

consumer 本来订阅subscribe的地方的tag变成了sql

package com.suntong.myshop.service.rocketmq.provider.filter.sql;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

public class Consumer {
    public static void main(String[] args) throws MQClientException {
//        1. 创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//        2. 自定NameServer地址
        consumer.setNamesrvAddr("192.168.79.130:9876");
//        3. 订阅主题Topic和Tag
        consumer.subscribe("FilterSqlTopic", MessageSelector.bySql("i>5"));

//        4. 设置回调参数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            /**
             * 接收消息内容
             * @param list
             * @param consumeConcurrentlyContext
             * @return
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg: list) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
//        5. 启动消费者consumer
        consumer.start();

        System.out.println("消费者启动");
    }
}

RocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。

  • 数值比较,比如:>,>=,<,<=,BETWEEN,=;
  • 字符比较,比如:=,<>,IN;
  • IS NULL 或者 IS NOT NULL;
  • 逻辑符号 AND,OR,NOT;

常量支持类型为:

  • 数值,比如:123,3.1415;
  • 字符,比如:'abc',必须用单引号包裹起来;
  • NULL,特殊的常量
  • 布尔值,TRUEFALSE

只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:

public void subscribe(finalString topic, final MessageSelector messageSelector)

事务消息

1)事务消息发送及提交

(1) 发送消息(half消息)。

(2) 服务端响应消息写入结果。

(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。

(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)

2)事务补偿

(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”

(2) Producer收到回查消息,检查回查消息对应的本地事务的状态

(3) 根据本地事务状态,重新Commit或者Rollback

其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。

3)事务消息状态

事务消息共有三种状态,提交状态、回滚状态、中间状态:

  • TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
  • TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
  • TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

生产者搞两个断点

producer

package com.suntong.myshop.service.rocketmq.provider.transaction;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.util.concurrent.TimeUnit;

public class Producer {
    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
//        1. 创建消息生产者Producer,并制定生产者组名,现在时事务消息生产者
        TransactionMQProducer producer = new TransactionMQProducer("group5");
//        2. 指定NameServer
        producer.setNamesrvAddr("192.168.79.130:9876");

        //添加事务监听器
        producer.setTransactionListener(new TransactionListener() {
            /**
             * 该方法中执行本地事务
             * @param message
             * @param o
             * @return
             */
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object o) {
                //提交 回滚 不做处理(会回查,下一个方法)
                if(StringUtils.equals("TagA",message.getTags())){
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
                if(StringUtils.equals("TagB",message.getTags())){
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
                if(StringUtils.equals("TagC",message.getTags())){
                    return LocalTransactionState.UNKNOW;
                }
                return LocalTransactionState.UNKNOW;
            }

            /**
             * 该方法时mq进行消息事务状态的回查
             * @param messageExt
             * @return
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                System.out.println("开始回查");
                System.out.println("消息Tag:" + messageExt.getTags());
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

//        3. 启动producer
        producer.start();

        //区分不同的消息 成功 失败 回查
        String[] tags = {"TagA","TagB","TagC"};

//        4. 创建消息对象,指定主题Topic,Tag和消息体
        for (int i=0;i<3;i++){
            //参数一:消息主题topic
            //参数二:消息tag
            //参数三:消息内容
            Message message = new Message("TransactionTopic",tags[i],("Hello world"+i).getBytes());

            //        5. 发送消息  不针对某一事务消息进行控制,传入null
            SendResult result = producer.sendMessageInTransaction(message, null);

            System.out.println("发送结果: "+ result);

            //睡一秒再发送下一个
            TimeUnit.SECONDS.sleep(1);
        }
        //
        //producer.shutdown();
    }
}

consumer 就修改了一下topic

package com.suntong.myshop.service.rocketmq.provider.transaction;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

/**
 * 消息的接收者
 */
public class Consumer {
    public static void main(String[] args) throws MQClientException {
//        1. 创建消费者Consumer,制定消费者组名
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//        2. 自定NameServer地址
        consumer.setNamesrvAddr("192.168.79.130:9876");
//        3. 订阅主题Topic和Tag
        consumer.subscribe("TransactionTopic","*");

        //设定消费模式:负载均衡 | 广播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);

//        4. 设置回调参数,处理消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            /**
             * 接收消息内容
             * @param list
             * @param consumeConcurrentlyContext
             * @return
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for (MessageExt msg: list) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
//        5. 启动消费者consumer
        consumer.start();

        System.out.println("消费者启动");
    }
}

正常应该只有AC两个能发过去,C会回查,结果ABC都发过去了,还都回查了,难搞。。。

相关文章

  • 消费消息(二)

    批量消息发送 批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的topic,相同的waitSt...

  • python3读写kafka

    消费kafka数据,方式一 消费kafka数据,方式二 将消息写入kafka

  • Windows 安装 RocketMQ

    一、RocketMQ 介绍 1、消息顺序2、消息重复消费3、事务消息 二、RocketMQ 安装 Windows:...

  • 消息消费

    总览 消息消费分为两种形式并发消费、顺序消费;这次主要讲并发消费。 消息从Broker拉取到客户端之后,等待客户端...

  • ActiveMQ的消息模式——队列模式(Queue)

    一、队列模式特点 客户端包括生产者和消费者 队列中的消息只能被一个消费者消费 消费者可以随时消费队列中的消息 二、...

  • rocketmq源码11-broker消息消费流程

    一 总体流程 二 broker消息消费处理 2.1 PullMessageProcessor.processReq...

  • RocketMQ基础篇 Consumer消费消息

    消费消息逻辑 消费消息逻辑主要分为三个模块 Rebalance 拉取消息 消费消息 Rebalance 集群模式下...

  • RocketMQ源码之消息轨迹

    一、消息轨迹数据结构 二、发送消息轨迹流程 三、消息轨迹存储 四、消费消息轨迹流程 五、总结 消息轨迹在不少情况是...

  • Rabbitmq

    一、服务连接 二、创建通道 三、交换器声明 四、队列声明 五、消息发布 六、消费消息

  • RocketMQ消息重试

    RocketMQ为了保证消息被消费采用ACK确认机制,消费者消费消息时需要给Broker反馈消息消费的情况,成功或...

网友评论

      本文标题:消费消息(二)

      本文链接:https://www.haomeiwen.com/subject/sdigqhtx.html