美文网首页
RocketMQ-发送消息

RocketMQ-发送消息

作者: 别拿爱情当饭吃 | 来源:发表于2021-05-30 15:32 被阅读0次
    RocketMQ-消息发送.jpeg

    运行NameServer和Broker后,我们可以尝试用代码示例写一下:消息发送的三种方式

    • 同步发送,适用场景:对可靠性高的业务场景
    • 异步发送,适用场景:对响应时间比较敏感的业务场景,就是不允许长时间等待结果的场景
    • 发送一次,适用场景:不关心发送结果场景。比如日志发送

    依赖

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.8.0</version>
    </dependency>
    

    同步发送

    public class SyncProducer {
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
            DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
            //设置生产者组
            defaultMQProducer.setProducerGroup("syncProducer");
    
            //设置nameserver
            defaultMQProducer.setNamesrvAddr("localhost:9876");
    
            //启动生产者
            defaultMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                //构建消息 topic tag 内容
                Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " +
                                i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //同步发送,且返回结果
                SendResult sendResult = defaultMQProducer.send(msg);
                System.out.println("发送结果"+sendResult);
            }
            //关闭生产者
            defaultMQProducer.shutdown();
        }
    }
    //运行结果
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2040000, offsetMsgId=C0A81FF100002A9F0000000000045402, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=350]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2230001, offsetMsgId=C0A81FF100002A9F00000000000454CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=350]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2260002, offsetMsgId=C0A81FF100002A9F0000000000045594, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=350]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22A0003, offsetMsgId=C0A81FF100002A9F000000000004565D, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=350]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22C0004, offsetMsgId=C0A81FF100002A9F0000000000045726, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=351]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22E0005, offsetMsgId=C0A81FF100002A9F00000000000457EF, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=351]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2300006, offsetMsgId=C0A81FF100002A9F00000000000458B8, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=351]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2320007, offsetMsgId=C0A81FF100002A9F0000000000045981, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=351]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2340008, offsetMsgId=C0A81FF100002A9F0000000000045A4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=352]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2360009, offsetMsgId=C0A81FF100002A9F0000000000045B13, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=352]
    14:29:56.930 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
    14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
    14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true
    

    异步发送

    public class AsyncProducer {
        public static void main(String[] args) throws Exception {
            //设置生产者名字
            DefaultMQProducer producer = new DefaultMQProducer("asyncproducer");
            // 设置nameserver的地址
            producer.setNamesrvAddr("localhost:9876");
            //生产者启动
            producer.start();
            //发送失败重试次数
            producer.setRetryTimesWhenSendAsyncFailed(0);
            
            int messageCount = 10;
            for (int i = 0; i < messageCount; i++) {
                try {
                    final int index = i;
                    Message msg = new Message("async_topic",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    //异步发送,这里多了一个SendCallBack回调函数
                    producer.send(msg, new SendCallback() {
                        //发送成功
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.println("发送成功:"+sendResult);
                        }
                        //发送失败
                        @Override
                        public void onException(Throwable e) {
                            System.out.printf("发送失败:"+e);
                            e.printStackTrace();
                        }
                    });
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            TimeUnit.SECONDS.sleep(3);
            producer.shutdown();
        }
    }
    //运行结果
    发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868BF40001, offsetMsgId=C0A81FF100002A9F0000000000046438, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=3], queueOffset=2]
    发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C1A0004, offsetMsgId=C0A81FF100002A9F0000000000046790, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=1]
    发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868BFA0002, offsetMsgId=C0A81FF100002A9F000000000004650E, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=3], queueOffset=3]
    发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C0B0003, offsetMsgId=C0A81FF100002A9F00000000000466BA, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=2], queueOffset=5]
    发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868BF10000, offsetMsgId=C0A81FF100002A9F00000000000465E4, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=1], queueOffset=2]
    发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C1C0005, offsetMsgId=C0A81FF100002A9F0000000000046866, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=2]
    发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C1C0006, offsetMsgId=C0A81FF100002A9F000000000004693C, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=3]
    发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C280007, offsetMsgId=C0A81FF100002A9F0000000000046A12, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=1], queueOffset=3]
    发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C2D0008, offsetMsgId=C0A81FF100002A9F0000000000046AE8, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=1], queueOffset=4]
    发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C330009, offsetMsgId=C0A81FF100002A9F0000000000046BBE, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=4]
    14:49:17.262 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
    14:49:17.265 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
    14:49:17.265 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true
    

    发送一次

    public class OnewayProducer {
        public static void main(String[] args) throws Exception{
            //初始化生产者
            DefaultMQProducer producer = new DefaultMQProducer("onewayproducer");
            // 置顶nameserver名字
            producer.setNamesrvAddr("localhost:9876");
            //启动生产者
            producer.start();
            for (int i = 0; i < 10; i++) {
                //构建消息体
                Message msg = new Message("oneway_topic" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                        i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //发送一次性消息
                producer.sendOneway(msg);
            }
    
            Thread.sleep(5000);        
            producer.shutdown();
        }
    }
    

    同步发送,异步发送,一次性发送是RocketMQ主要的三种消息发送方式。

    来一个拓展:类似的Kafka和RabbitMQ有哪些消息发送方式,欢迎留言回答。

    工作中,具体用什么发送方式,得取决你的业务场景。需要可靠性的,就用同步发送。需要低延时的,就用异步发送,如果压根不care发送结果,直接用一次性发送。

    后续文章

    • RocketMQ-入门(已更新)
    • RocketMQ-消息发送(已更新)
    • RocketMQ-消费信息
    • RocketMQ-消费者的广播模式和集群模式
    • RocketMQ-顺序消息
    • RocketMQ-延迟消息
    • RocketMQ-批量消息
    • RocketMQ-过滤消息
    • RocketMQ-事务消息
    • RocketMQ-消息存储
    • RocketMQ-高可用
    • RocketMQ-高性能
    • RocketMQ-主从复制
    • RocketMQ-刷盘机制
    • RocketMQ-幂等性
    • RocketMQ-消息重试
    • RocketMQ-死信队列
      ...

    欢迎各位入(guan)股(zhu),后续文章干货多多。

    相关文章

      网友评论

          本文标题:RocketMQ-发送消息

          本文链接:https://www.haomeiwen.com/subject/getmsltx.html