美文网首页
产生消息

产生消息

作者: isuntong | 来源:发表于2020-02-24 13:27 被阅读0次

    优点:解耦,削峰、数据分发

    缺点:
    系统可用性降低(mq高可用)、
    系统复杂度提高(消息丢失、消息顺序、保证没有重复消费)、
    一致性问题

    RabbitMQ:erlang、万级(单机吞吐量)、us级(时效性)、高(主从架构)(可用性)、基于erlang开发,所以并发能力很强,性能及其好,延时很低,管理界面较丰富

    RocketMQ:java、10万级、ms级、非常高(分布式架构)、MQ功能比较完备,扩展性佳

    Kafka:scala、10万级、ms级以内、非常高(分布式架构)、只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广

    notepad安装NPPFTP插件可以直接去修改linux中的文件

    进入运行的容器

    # 首先使用下面的命令,查看容器ID(CONTAINER ID):
    docker ps -a
    # 然后用下面的命令进入容器,就可以使用bash命令浏览容器里的文件:
    docker exec -it [CONTAINER ID] bash
    # 有的镜像没有bash命令,可以用对应的shell,比如sh
    docker exec -it [CONTAINER ID] sh
    

    角色介绍

    Producer:消息的发送者
    Consumer:消息的接收者
    Broker:暂存和传输消息,邮局
    NameServer:管理Broker,邮局管理机构
    Topic:区分消息的种类;一个发送者可以发送消息给一个或者多个Topic;一个消费的接收者可以订阅一个或者多个Topic消息
    Message Queue:相当于是Topic的分区,用于并行发送和接收消息

    集群

    Producer、Consumer、NameServer没有数据同步关系,较为简单

    Brocker集群读写、主从
    Master可以部署多台,一个Master可以对应多个Slavor、但是一个Slavor只能对应一个Master

    集群模式

    单Master模式:风险较大,本地测试可以
    多Master模式:配置简单
    多Master多Slave模式(异步):Master宕机,丢失少量消息
    多Master多Slave模式(同步):性能较低10%,安全性高使用这个模式

    消息发送者步骤分析

    1. 创建消息生产者Producer,并制定生产者组名
    2. 指定NameServer
    3. 启动producer
    4. 创建消息对象,指定主题Topic,Tag和消息体
    5. 发送消息
    6. 关闭生产者producer

    消息消费者步骤分析

    1. 创建消费者Consumer,制定消费者组名
    2. 自定NameServer地址
    3. 订阅主题Topic和Tag
    4. 设置回调参数,处理消息
    5. 启动消费者consumer

    发送同步消息

    package com.suntong.myshop.service.rocketmq.provider.tongbu;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.client.producer.SendStatus;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 发送同步消息
     */
    public class SyncProducer {
        public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    //        1. 创建消息生产者Producer,并制定生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("group1");
    //        2. 指定NameServer
            producer.setNamesrvAddr("192.168.79.130:9876");
    //        3. 启动producer
            producer.start();
    //        4. 创建消息对象,指定主题Topic,Tag和消息体
            for (int i=0;i<10;i++){
                //参数一:消息主题topic
                //参数二:消息tag
                //参数三:消息内容
                Message message = new Message("base","Tag1",("Hello world"+i).getBytes());
    
                //        5. 发送消息
                SendResult result = producer.send(message);
                //发送状态
                SendStatus sendStatus = result.getSendStatus();
                //消息Id
                String msgId = result.getMsgId();
                //消息接受队列Id
                int queueId = result.getMessageQueue().getQueueId();
                System.out.println("发送状态: "+ result + ", 消息Id" + msgId + ", 队列" + queueId);
    
                //睡一秒再发送下一个
                TimeUnit.SECONDS.sleep(1);
            }
    //        6. 关闭生产者producer
            producer.shutdown();
        }
    }
    
    

    打印

    发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FDC640000, offsetMsgId=C0A84F8200002A9F0000000000000000, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=0], 消息IdC0A82B9C26A418B4AAC2793FDC640000, 队列1
    发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FE0670001, offsetMsgId=C0A84F8200002A9F00000000000000A9, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=2], queueOffset=0], 消息IdC0A82B9C26A418B4AAC2793FE0670001, 队列2
    发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FE4520002, offsetMsgId=C0A84F8200002A9F0000000000000152, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=3], queueOffset=0], 消息IdC0A82B9C26A418B4AAC2793FE4520002, 队列3
    发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FE83E0003, offsetMsgId=C0A84F8200002A9F00000000000001FB, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=0], queueOffset=0], 消息IdC0A82B9C26A418B4AAC2793FE83E0003, 队列0
    发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FEC2A0004, offsetMsgId=C0A84F8200002A9F00000000000002A4, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=1], 消息IdC0A82B9C26A418B4AAC2793FEC2A0004, 队列1
    发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FF0170005, offsetMsgId=C0A84F8200002A9F000000000000034D, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=2], queueOffset=1], 消息IdC0A82B9C26A418B4AAC2793FF0170005, 队列2
    发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FF4020006, offsetMsgId=C0A84F8200002A9F00000000000003F6, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=3], queueOffset=1], 消息IdC0A82B9C26A418B4AAC2793FF4020006, 队列3
    发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FF7EF0007, offsetMsgId=C0A84F8200002A9F000000000000049F, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=0], queueOffset=1], 消息IdC0A82B9C26A418B4AAC2793FF7EF0007, 队列0
    发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FFBDD0008, offsetMsgId=C0A84F8200002A9F0000000000000548, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=2], 消息IdC0A82B9C26A418B4AAC2793FFBDD0008, 队列1
    发送状态: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C26A418B4AAC2793FFFC90009, offsetMsgId=C0A84F8200002A9F00000000000005F1, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=2], queueOffset=2], 消息IdC0A82B9C26A418B4AAC2793FFFC90009, 队列2
    13:03:58.404 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10909] result: true
    13:03:58.409 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10911] result: true
    13:03:58.409 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:9876] result: true
    
    

    消息来了

    异步消息

    package com.suntong.myshop.service.rocketmq.provider.yibu;
    
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 发送异步消息
     */
    public class AsyncProducer {
        public static void main(String[] args) throws InterruptedException, MQClientException, RemotingException, MQBrokerException {
            //        1. 创建消息生产者Producer,并制定生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("group1");
    //        2. 指定NameServer
            producer.setNamesrvAddr("192.168.79.130:9876");
    //        3. 启动producer
            producer.start();
    //        4. 创建消息对象,指定主题Topic,Tag和消息体
            for (int i=0;i<10;i++){
                //参数一:消息主题topic
                //参数二:消息tag
                //参数三:消息内容
                Message message = new Message("base","Tag2",("Hello world"+i).getBytes());
    
                //        5. 发送异步消息
                producer.send(message, new SendCallback(){
    
                    /**
                     * 发送成功回调函数
                     * @param sendResult
                     */
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("发送结果: "+ sendResult);
                    }
    
                    /**
                     * 发送失败回调参数
                     * @param throwable
                     */
                    @Override
                    public void onException(Throwable throwable) {
                        System.out.println("发送异常: "+ throwable);
                    }
                });
                
                //睡一秒再发送下一个
                TimeUnit.SECONDS.sleep(1);
            }
    //        6. 关闭生产者producer
            producer.shutdown();
        }
    }
    
    

    结果

    E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=64399:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-provider\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.provider.yibu.AsyncProducer
    发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC2794847D60001, offsetMsgId=C0A84F8200002A9F0000000000000743, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=2], queueOffset=3]
    发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC2794847D60002, offsetMsgId=C0A84F8200002A9F000000000000069A, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=3]
    发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC2794847D60000, offsetMsgId=C0A84F8200002A9F00000000000007EC, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=4]
    发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC279484AB80003, offsetMsgId=C0A84F8200002A9F0000000000000895, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=3], queueOffset=2]
    发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC279484EA10004, offsetMsgId=C0A84F8200002A9F000000000000093E, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=2], queueOffset=4]
    发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC27948528A0005, offsetMsgId=C0A84F8200002A9F00000000000009E7, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=0], queueOffset=2]
    发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC2794856720006, offsetMsgId=C0A84F8200002A9F0000000000000A90, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=3], queueOffset=3]
    发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC279485A5A0007, offsetMsgId=C0A84F8200002A9F0000000000000B39, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=1], queueOffset=5]
    发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC279485E420008, offsetMsgId=C0A84F8200002A9F0000000000000BE2, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=0], queueOffset=3]
    发送结果: SendResult [sendStatus=SEND_OK, msgId=C0A82B9C14CC18B4AAC27948622B0009, offsetMsgId=C0A84F8200002A9F0000000000000C8B, messageQueue=MessageQueue [topic=base, brokerName=broker-a, queueId=2], queueOffset=5]
    13:13:07.878 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:9876] result: true
    13:13:07.883 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10911] result: true
    13:13:07.883 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10909] result: true
    
    

    消息也来了

    单项消息

    package com.suntong.myshop.service.rocketmq.provider.danxiang;
    
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 发送单向消息
     */
    public class OneWayProducer {
        public static void main(String[] args) throws RemotingException, MQClientException, InterruptedException {
            //        1. 创建消息生产者Producer,并制定生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("group1");
    //        2. 指定NameServer
            producer.setNamesrvAddr("192.168.79.130:9876");
    //        3. 启动producer
            producer.start();
    //        4. 创建消息对象,指定主题Topic,Tag和消息体
            for (int i=0;i<10;i++){
                //参数一:消息主题topic
                //参数二:消息tag
                //参数三:消息内容
                Message message = new Message("base","Tag3",("Hello world 单项消息 "+i).getBytes());
    
                //        5. 发送单向消息
                producer.sendOneway(message);
    
                //睡一秒再发送下一个
                TimeUnit.SECONDS.sleep(1);
            }
    //        6. 关闭生产者producer
            producer.shutdown();
    
        }
    }
    
    

    打印

    E:\java\Java8\bin\java.exe "-javaagent:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\lib\idea_rt.jar=64562:E:\IntelliJ IDEA\IntelliJ IDEA 2019.2\bin" -Dfile.encoding=UTF-8 -classpath E:\java\Java8\jre\lib\charsets.jar;E:\java\Java8\jre\lib\deploy.jar;E:\java\Java8\jre\lib\ext\access-bridge-64.jar;E:\java\Java8\jre\lib\ext\cldrdata.jar;E:\java\Java8\jre\lib\ext\dnsns.jar;E:\java\Java8\jre\lib\ext\jaccess.jar;E:\java\Java8\jre\lib\ext\jfxrt.jar;E:\java\Java8\jre\lib\ext\localedata.jar;E:\java\Java8\jre\lib\ext\nashorn.jar;E:\java\Java8\jre\lib\ext\sunec.jar;E:\java\Java8\jre\lib\ext\sunjce_provider.jar;E:\java\Java8\jre\lib\ext\sunmscapi.jar;E:\java\Java8\jre\lib\ext\sunpkcs11.jar;E:\java\Java8\jre\lib\ext\zipfs.jar;E:\java\Java8\jre\lib\javaws.jar;E:\java\Java8\jre\lib\jce.jar;E:\java\Java8\jre\lib\jfr.jar;E:\java\Java8\jre\lib\jfxswt.jar;E:\java\Java8\jre\lib\jsse.jar;E:\java\Java8\jre\lib\management-agent.jar;E:\java\Java8\jre\lib\plugin.jar;E:\java\Java8\jre\lib\resources.jar;E:\java\Java8\jre\lib\rt.jar;E:\java2\IdeaProjects\microservice-my-shop\myshop-service-rocketmq-provider\target\classes;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-web\2.0.6.RELEASE\spring-boot-starter-web-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter\2.0.6.RELEASE\spring-boot-starter-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot\2.0.6.RELEASE\spring-boot-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-autoconfigure\2.0.6.RELEASE\spring-boot-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-logging\2.0.6.RELEASE\spring-boot-starter-logging-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-classic\1.2.3\logback-classic-1.2.3.jar;E:\java2\maven\bos_repository\ch\qos\logback\logback-core\1.2.3\logback-core-1.2.3.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-to-slf4j\2.10.0\log4j-to-slf4j-2.10.0.jar;E:\java2\maven\bos_repository\org\apache\logging\log4j\log4j-api\2.10.0\log4j-api-2.10.0.jar;E:\java2\maven\bos_repository\org\slf4j\jul-to-slf4j\1.7.25\jul-to-slf4j-1.7.25.jar;E:\java2\maven\bos_repository\javax\annotation\javax.annotation-api\1.3.2\javax.annotation-api-1.3.2.jar;E:\java2\maven\bos_repository\org\yaml\snakeyaml\1.19\snakeyaml-1.19.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-json\2.0.6.RELEASE\spring-boot-starter-json-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-databind\2.9.7\jackson-databind-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-annotations\2.9.0\jackson-annotations-2.9.0.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\core\jackson-core\2.9.7\jackson-core-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jdk8\2.9.7\jackson-datatype-jdk8-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\datatype\jackson-datatype-jsr310\2.9.7\jackson-datatype-jsr310-2.9.7.jar;E:\java2\maven\bos_repository\com\fasterxml\jackson\module\jackson-module-parameter-names\2.9.7\jackson-module-parameter-names-2.9.7.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-tomcat\2.0.6.RELEASE\spring-boot-starter-tomcat-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-core\8.5.34\tomcat-embed-core-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-el\8.5.34\tomcat-embed-el-8.5.34.jar;E:\java2\maven\bos_repository\org\apache\tomcat\embed\tomcat-embed-websocket\8.5.34\tomcat-embed-websocket-8.5.34.jar;E:\java2\maven\bos_repository\org\hibernate\validator\hibernate-validator\6.0.13.Final\hibernate-validator-6.0.13.Final.jar;E:\java2\maven\bos_repository\javax\validation\validation-api\2.0.1.Final\validation-api-2.0.1.Final.jar;E:\java2\maven\bos_repository\org\jboss\logging\jboss-logging\3.3.2.Final\jboss-logging-3.3.2.Final.jar;E:\java2\maven\bos_repository\com\fasterxml\classmate\1.3.4\classmate-1.3.4.jar;E:\java2\maven\bos_repository\org\springframework\spring-web\5.0.10.RELEASE\spring-web-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-beans\5.0.10.RELEASE\spring-beans-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-webmvc\5.0.10.RELEASE\spring-webmvc-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-aop\5.0.10.RELEASE\spring-aop-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-context\5.0.10.RELEASE\spring-context-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-expression\5.0.10.RELEASE\spring-expression-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-actuator\2.0.6.RELEASE\spring-boot-starter-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator-autoconfigure\2.0.6.RELEASE\spring-boot-actuator-autoconfigure-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-actuator\2.0.6.RELEASE\spring-boot-actuator-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\io\micrometer\micrometer-core\1.0.7\micrometer-core-1.0.7.jar;E:\java2\maven\bos_repository\org\hdrhistogram\HdrHistogram\2.1.10\HdrHistogram-2.1.10.jar;E:\java2\maven\bos_repository\org\latencyutils\LatencyUtils\2.0.3\LatencyUtils-2.0.3.jar;E:\java2\maven\bos_repository\org\slf4j\slf4j-api\1.7.25\slf4j-api-1.7.25.jar;E:\java2\maven\bos_repository\org\objenesis\objenesis\2.6\objenesis-2.6.jar;E:\java2\maven\bos_repository\org\springframework\spring-core\5.0.10.RELEASE\spring-core-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-jcl\5.0.10.RELEASE\spring-jcl-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-starter-stream-rocketmq\0.2.1.RELEASE\spring-cloud-starter-stream-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream-binder-rocketmq\0.2.1.RELEASE\spring-cloud-stream-binder-rocketmq-0.2.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\cloud\spring-cloud-stream\2.0.1.RELEASE\spring-cloud-stream-2.0.1.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\boot\spring-boot-starter-validation\2.0.6.RELEASE\spring-boot-starter-validation-2.0.6.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-messaging\5.0.10.RELEASE\spring-messaging-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-core\5.0.9.RELEASE\spring-integration-core-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tx\5.0.10.RELEASE\spring-tx-5.0.10.RELEASE.jar;E:\java2\maven\bos_repository\io\projectreactor\reactor-core\3.1.10.RELEASE\reactor-core-3.1.10.RELEASE.jar;E:\java2\maven\bos_repository\org\reactivestreams\reactive-streams\1.0.2\reactive-streams-1.0.2.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-jmx\5.0.9.RELEASE\spring-integration-jmx-5.0.9.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\spring-tuple\1.0.0.RELEASE\spring-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\com\esotericsoftware\kryo-shaded\3.0.3\kryo-shaded-3.0.3.jar;E:\java2\maven\bos_repository\com\esotericsoftware\minlog\1.3.0\minlog-1.3.0.jar;E:\java2\maven\bos_repository\org\springframework\integration\spring-integration-tuple\1.0.0.RELEASE\spring-integration-tuple-1.0.0.RELEASE.jar;E:\java2\maven\bos_repository\org\springframework\retry\spring-retry\1.2.2.RELEASE\spring-retry-1.2.2.RELEASE.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-client\4.3.1\rocketmq-client-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-common\4.3.1\rocketmq-common-4.3.1.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-remoting\4.3.1\rocketmq-remoting-4.3.1.jar;E:\java2\maven\bos_repository\com\alibaba\fastjson\1.2.29\fastjson-1.2.29.jar;E:\java2\maven\bos_repository\io\netty\netty-all\4.1.29.Final\netty-all-4.1.29.Final.jar;E:\java2\maven\bos_repository\org\apache\rocketmq\rocketmq-logging\4.3.1\rocketmq-logging-4.3.1.jar;E:\java2\maven\bos_repository\io\netty\netty-tcnative-boringssl-static\1.1.33.Fork26\netty-tcnative-boringssl-static-1.1.33.Fork26.jar;E:\java2\maven\bos_repository\org\apache\commons\commons-lang3\3.7\commons-lang3-3.7.jar com.suntong.myshop.service.rocketmq.provider.danxiang.OneWayProducer
    13:25:25.124 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10909] result: true
    13:25:25.128 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:10911] result: true
    13:25:25.128 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.79.130:9876] result: true
    

    没有结果接受,适合发日志啥的

    消息也来了

    相关文章

      网友评论

          本文标题:产生消息

          本文链接:https://www.haomeiwen.com/subject/qypmqhtx.html