美文网首页MQ
RocketMQ 消息发送方式 -- 同步(reliable s

RocketMQ 消息发送方式 -- 同步(reliable s

作者: 又语 | 来源:发表于2018-07-05 21:48 被阅读24次

    本文展示 RocketMQ 同步发送消息的 Java 代码示例。

    所有示例使用 Maven 工程构建,需要添加 rocketmq-client 依赖

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.2.0</version>
    </dependency>
    

    Java 代码

    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.io.UnsupportedEncodingException;
    
    public class SyncProducer {
        
        public static void main(String[] args)
            throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException,
            MQBrokerException {
            // 初始化消息生产者,需要指定消息生产者组名称
            DefaultMQProducer producer = new DefaultMQProducer("synchronous-group");
            // 显示设置 NameServer 的地址
            producer.setNamesrvAddr("localhost:9876");
            // 启动消息生产者实例
            producer.start();
            // 循环发送消息10次
            for (int i = 0; i < 10; i++) {
                // 创建消息实例,指定 Topic 、 Tag 和消息体
                Message msg = new Message("TopicDemo", "TagSynchronous", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 调用消息生产者实例发送消息给消息代理
                SendResult sendResult = producer.send(msg);
                // 打印消息发送结果
                System.out.printf("%s%n", sendResult);
            }
            // 关闭不再使用消息生产者
            producer.shutdown();
        }
    }
    

    运行代码前先在 RocketMQ Console 上查看一下 Topic 列表,没有名称为 TopicDemo 的 Topic。

    运行代码,日志打印如下:

    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD74B0000, offsetMsgId=0A00000700002A9F000000000002C5BA, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=1], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7680001, offsetMsgId=0A00000700002A9F000000000002C675, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=2], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD76B0002, offsetMsgId=0A00000700002A9F000000000002C730, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=3], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7730003, offsetMsgId=0A00000700002A9F000000000002C7EB, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=0], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7760004, offsetMsgId=0A00000700002A9F000000000002C8A6, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=1], queueOffset=1]
    SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7790005, offsetMsgId=0A00000700002A9F000000000002C961, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=2], queueOffset=1]
    SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD77D0006, offsetMsgId=0A00000700002A9F000000000002CA1C, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=3], queueOffset=1]
    SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7800007, offsetMsgId=0A00000700002A9F000000000002CAD7, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=0], queueOffset=1]
    SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7830008, offsetMsgId=0A00000700002A9F000000000002CB92, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=1], queueOffset=2]
    SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7870009, offsetMsgId=0A00000700002A9F000000000002CC4D, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=2], queueOffset=2]
    

    刷新 RocketMQ Console 的 Topic 列表,可以看到刚刚发送的消息

    点击 STATUS 可以查看当前消息状态


    测试环境使用单主(Master)部署,只启动了一个 NameServer 和一个 Broker,从 TopicDemo STATUS 中可以看到,一个 Topic 会被分为一个或多个 Message Queue,此处便分为了 4 个 Message Queue

    上一篇:RocketMQ Console 的安装及运行
    下一篇:

    相关文章

      网友评论

        本文标题:RocketMQ 消息发送方式 -- 同步(reliable s

        本文链接:https://www.haomeiwen.com/subject/tfjmuftx.html