RocketMQ快速入门

作者: mingxungu | 来源:发表于2020-04-11 16:33 被阅读0次

本章简单讲讲RocketMQ的入门操作,消息发送和消息接收。

引入 rocketmq-client

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.2.0</version>
</dependency>

编写Producer

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("producer_test");
        producer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
        producer.start();

        for (int i = 0; i < 100; i++) {
            try {
                //构建消息
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("测试RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
                );
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        producer.shutdown();
    }
}

查看结果

image

编写Consumer

public static void main(String[] args){
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
            consumer.setConsumerGroup("consumer_test_push");
            consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
            consumer.subscribe("TopicTest", "*");
            consumer.registerMessageListener(new MessageListenerConcurrently(){

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
                        ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
                    try {
                        for(MessageExt msg : paramList){
                            String msgbody = new String(msg.getBody(), "utf-8");
                            System.out.println("  MessageBody: "+ msgbody);//输出消息内容
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再试
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消费成功
                }
            });
            consumer.start();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

查看结果

image

看到消费的结果大家可能有疑问,我们生产消息的时候是按照顺序生产的消息,消费时候为什么不是顺序消费下来的。

MQ消息的无序性,每个主题对应多个队列,生产消息时是根据算法放置不同的队列中,消费则就是无序了(有序消息后面讨论)

也有可能出现一条消息被消费了多次,RocketMQ的目标就是不丢数据,<u>每条消息至少发送一次</u>,内部通过ACK的确认机制实现的后面会重点讨论

消息管控台

为了方便的查看消息的详情我们可以通过消息的管控台更好的管理和查看消息详情,当然我们也可以通过后台的提供的命令来为运维提供更多的管理。

RocketMQ-Console地址: https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

可以直接下载到本地之后通过mavne进行编译获取jar,该项目是SpringBoot项目

mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar

丢到linux服务器上启动

(1)启动时设置具体的RocketMQ的参数

java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=10.10.12.203:9876;10.10.12.204:9876

(2)直接修改rocketmq-console-ng-1.0.0.jar中的配置文件,找到rocketmq-console-ng-1.0.0.jar\BOOT-INF\classes\application.properties文件,根据自己的NamesrvAddr进行修改rocketmq.config.namesrvAddr的值,默认端口12581

浏览器登录查看控制台信息

image

查看RocketMQ集群的节点信息

image

根据主题时间段查询消息

image

查看某条消息的具体信息

image

管控台提供了很多运维功能能极大的提高我们的运维效率,里面的功能包括创建主题、修改主题、发送消息、对消费者的信息进行查看等功能我们不一一介绍,可以简单的了解使用。

相关文章

  • RocketMQ:消息发送与消费

    在此之前,我们已经介绍过《RocketMQ:快速入门》和《RocketMQ:搭建集群》。现在我们已经准备好Rock...

  • 2、RocketMQ基础-RocketMQ快速入门

    RocketMQ快速入门 RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,在阿里内部,Roc...

  • RocketMQ快速入门

    本章简单讲讲RocketMQ的入门操作,消息发送和消息接收。 引入 rocketmq-client 编写Produ...

  • RocketMQ实战 - 快速入门

    RocketMQ 是阿里开源的(目前已捐赠给Apache了)一款高性能、高吞吐量的分布式消息中间件。 参考资料 十...

  • RocketMQ:搭建集群

    在上一篇《RocketMQ:快速入门》之后,今天说一说如何搭建RocketMQ集群。首先看一下集群架构图: 1. ...

  • 三、RocketMQ快速开始

    快速开始 本快速入门指南是在本地计算机上设置RocketMQ消息传递系统以发送和接收消息的详细说明。 环境要求(版...

  • rocketmq总目录

    实战 rocketmq最简单的入门demo rocketmq的常用概念,接口和方法 rocketmq的正式部署 高...

  • RocketMQ入门

    RocketMQ入门 1. RocketMQ简介 RocketMQ是阿里开源的消息中间件,它是纯java开发,具有...

  • RocketMQ 入门 - 单 master 模式

    RocketMQ 入门 what is rocketMQ:RocketMQ 是阿里开源的一款高性能、高吞吐量的分布...

  • rocketMq整合springBoot快速入门

    一、什么是消息队列 MQ? MQ 是什么?队列是什么,MQ 我们可以理解为消息队列,队列我们可以理解为管道。以管道...

网友评论

    本文标题:RocketMQ快速入门

    本文链接:https://www.haomeiwen.com/subject/mwkumhtx.html