美文网首页
RocketMQ-发送消息

RocketMQ-发送消息

作者: 别拿爱情当饭吃 | 来源:发表于2021-05-30 15:32 被阅读0次
RocketMQ-消息发送.jpeg

运行NameServer和Broker后,我们可以尝试用代码示例写一下:消息发送的三种方式

  • 同步发送,适用场景:对可靠性高的业务场景
  • 异步发送,适用场景:对响应时间比较敏感的业务场景,就是不允许长时间等待结果的场景
  • 发送一次,适用场景:不关心发送结果场景。比如日志发送

依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.8.0</version>
</dependency>

同步发送

public class SyncProducer {
    public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
        //设置生产者组
        defaultMQProducer.setProducerGroup("syncProducer");

        //设置nameserver
        defaultMQProducer.setNamesrvAddr("localhost:9876");

        //启动生产者
        defaultMQProducer.start();

        for (int i = 0; i < 10; i++) {
            //构建消息 topic tag 内容
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //同步发送,且返回结果
            SendResult sendResult = defaultMQProducer.send(msg);
            System.out.println("发送结果"+sendResult);
        }
        //关闭生产者
        defaultMQProducer.shutdown();
    }
}
//运行结果
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2040000, offsetMsgId=C0A81FF100002A9F0000000000045402, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=350]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2230001, offsetMsgId=C0A81FF100002A9F00000000000454CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=350]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2260002, offsetMsgId=C0A81FF100002A9F0000000000045594, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=350]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22A0003, offsetMsgId=C0A81FF100002A9F000000000004565D, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=350]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22C0004, offsetMsgId=C0A81FF100002A9F0000000000045726, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=351]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22E0005, offsetMsgId=C0A81FF100002A9F00000000000457EF, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=351]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2300006, offsetMsgId=C0A81FF100002A9F00000000000458B8, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=351]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2320007, offsetMsgId=C0A81FF100002A9F0000000000045981, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=351]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2340008, offsetMsgId=C0A81FF100002A9F0000000000045A4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=352]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2360009, offsetMsgId=C0A81FF100002A9F0000000000045B13, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=352]
14:29:56.930 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true

异步发送

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        //设置生产者名字
        DefaultMQProducer producer = new DefaultMQProducer("asyncproducer");
        // 设置nameserver的地址
        producer.setNamesrvAddr("localhost:9876");
        //生产者启动
        producer.start();
        //发送失败重试次数
        producer.setRetryTimesWhenSendAsyncFailed(0);
        
        int messageCount = 10;
        for (int i = 0; i < messageCount; i++) {
            try {
                final int index = i;
                Message msg = new Message("async_topic",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                //异步发送,这里多了一个SendCallBack回调函数
                producer.send(msg, new SendCallback() {
                    //发送成功
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("发送成功:"+sendResult);
                    }
                    //发送失败
                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("发送失败:"+e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        TimeUnit.SECONDS.sleep(3);
        producer.shutdown();
    }
}
//运行结果
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868BF40001, offsetMsgId=C0A81FF100002A9F0000000000046438, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=3], queueOffset=2]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C1A0004, offsetMsgId=C0A81FF100002A9F0000000000046790, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=1]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868BFA0002, offsetMsgId=C0A81FF100002A9F000000000004650E, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=3], queueOffset=3]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C0B0003, offsetMsgId=C0A81FF100002A9F00000000000466BA, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=2], queueOffset=5]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868BF10000, offsetMsgId=C0A81FF100002A9F00000000000465E4, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=1], queueOffset=2]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C1C0005, offsetMsgId=C0A81FF100002A9F0000000000046866, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=2]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C1C0006, offsetMsgId=C0A81FF100002A9F000000000004693C, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=3]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C280007, offsetMsgId=C0A81FF100002A9F0000000000046A12, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=1], queueOffset=3]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C2D0008, offsetMsgId=C0A81FF100002A9F0000000000046AE8, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=1], queueOffset=4]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C330009, offsetMsgId=C0A81FF100002A9F0000000000046BBE, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=4]
14:49:17.262 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:49:17.265 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:49:17.265 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true

发送一次

public class OnewayProducer {
    public static void main(String[] args) throws Exception{
        //初始化生产者
        DefaultMQProducer producer = new DefaultMQProducer("onewayproducer");
        // 置顶nameserver名字
        producer.setNamesrvAddr("localhost:9876");
        //启动生产者
        producer.start();
        for (int i = 0; i < 10; i++) {
            //构建消息体
            Message msg = new Message("oneway_topic" /* Topic */,
                "TagA" /* Tag */,
                ("Hello RocketMQ " +
                    i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //发送一次性消息
            producer.sendOneway(msg);
        }

        Thread.sleep(5000);        
        producer.shutdown();
    }
}

同步发送,异步发送,一次性发送是RocketMQ主要的三种消息发送方式。

来一个拓展:类似的Kafka和RabbitMQ有哪些消息发送方式,欢迎留言回答。

工作中,具体用什么发送方式,得取决你的业务场景。需要可靠性的,就用同步发送。需要低延时的,就用异步发送,如果压根不care发送结果,直接用一次性发送。

后续文章

  • RocketMQ-入门(已更新)
  • RocketMQ-消息发送(已更新)
  • RocketMQ-消费信息
  • RocketMQ-消费者的广播模式和集群模式
  • RocketMQ-顺序消息
  • RocketMQ-延迟消息
  • RocketMQ-批量消息
  • RocketMQ-过滤消息
  • RocketMQ-事务消息
  • RocketMQ-消息存储
  • RocketMQ-高可用
  • RocketMQ-高性能
  • RocketMQ-主从复制
  • RocketMQ-刷盘机制
  • RocketMQ-幂等性
  • RocketMQ-消息重试
  • RocketMQ-死信队列
    ...

欢迎各位入(guan)股(zhu),后续文章干货多多。

相关文章

  • RocketMQ-消息发送

    简介 本文通过问题入手,介绍下RocketMQ的消息发送逻辑是怎么样的。消息发送的大体逻辑图如下: 问题 首先我们...

  • RocketMQ-发送消息

    运行NameServer和Broker后,我们可以尝试用代码示例写一下:消息发送的三种方式 同步发送,适用场景:对...

  • RocketMQ-延时消息

    一、延时消息的使用 使用比较简单,指定message的DelayTimeLevel即可。示例代码如下: 目前roc...

  • RocketMQ-事务消息

    一、事务消息的引出 以购物场景为例,张三购买物品,账户扣款 100 元的同时,需要保证在下游的会员服务中给该账户增...

  • RocketMQ-异步消息

    异步消息 生产者

  • RocketMQ-广播消息

    广播消息 生产者 消费者

  • RocketMQ-顺序消息

    敬请期待!!

  • RocketMQ-普通消息

    一、摘要 默认消息发送超时时间为3s 默认消息发送是同步的发送模式,同步发送会发送1+重试次数,默认重试2,一共3...

  • RocketMQ-延迟消息

    延迟消息 消息发送到Broker后,要特定的事件才会被Consumer消费。 生产者 默认配置级别

  • RocketMQ-消息发送流程及定时拉取模式

    消息发送流程主要包括如下三个方面: 1、Topic路由寻址2、选择消息队列3、消息发送、重试、Broker规避 R...

网友评论

      本文标题:RocketMQ-发送消息

      本文链接:https://www.haomeiwen.com/subject/getmsltx.html