美文网首页
RocketMQ快速上手

RocketMQ快速上手

作者: 万物始 | 来源:发表于2019-03-07 00:51 被阅读0次

RocketMQ 是阿里开源的一款高性能、高吞吐量的消息中间件。

RocketMQ 架构介绍

RocketMQ架构分为4部分,Producer、Consumer、Nameserver、Broker,四部分均可集群部署。

Producer、Consumer为客户端。Producer是消息发送端,Consumer是消费消息端。开发过程中,我们会对这一部分关注更多。

Nameserver、Broker为服务端。Namesever可以看作架构的大脑,负责服务的发现和路由;Broker可以看作消息的中转站,Producer发送的消息会先到Broker,然后由Consumer消费。

RocketMQ 架构

RocketMQ 安装

下面介绍如何在本地安装RocketMQ,并使用源码中自带的demo发送消息、消费消息。

环境准备:

  1. 64bit OS, Linux/Unix/Mac 平台
  2. 64bit JDK 1.8+;
  3. Maven 3.2.x;

下载代码:RocketMQ项目由github托管,我们将从github上下载rocketMQ的源码
链接:RocketMQ github仓库

如果装有git,可以通过git checkout 代码到本地,如果git没有也可以直接下载安装包解压得到文件,文件目录如下


RocketMQ项目目录

安装步骤如下:

构建,进入文件目录,通过maven构建项目

mvn -Prelease-all -DskipTests clean install -U

稍等片刻,构建成功

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Apache RocketMQ 4.4.0 4.4.0 ........................ SUCCESS [  3.836 s]
[INFO] rocketmq-logging 4.4.0 ............................. SUCCESS [  2.971 s]
[INFO] rocketmq-remoting 4.4.0 ............................ SUCCESS [  2.395 s]
[INFO] rocketmq-common 4.4.0 .............................. SUCCESS [  3.386 s]
[INFO] rocketmq-client 4.4.0 .............................. SUCCESS [  4.092 s]
[INFO] rocketmq-store 4.4.0 ............................... SUCCESS [  2.340 s]
[INFO] rocketmq-srvutil 4.4.0 ............................. SUCCESS [  0.589 s]
[INFO] rocketmq-filter 4.4.0 .............................. SUCCESS [  1.187 s]
[INFO] rocketmq-acl 4.4.0 ................................. SUCCESS [  0.862 s]
[INFO] rocketmq-broker 4.4.0 .............................. SUCCESS [  2.778 s]
[INFO] rocketmq-tools 4.4.0 ............................... SUCCESS [  1.686 s]
[INFO] rocketmq-namesrv 4.4.0 ............................. SUCCESS [  0.986 s]
[INFO] rocketmq-logappender 4.4.0 ......................... SUCCESS [  0.845 s]
[INFO] rocketmq-openmessaging 4.4.0 ....................... SUCCESS [  0.742 s]
[INFO] rocketmq-example 4.4.0 ............................. SUCCESS [  0.843 s]
[INFO] rocketmq-test 4.4.0 ................................ SUCCESS [  1.221 s]
[INFO] rocketmq-distribution 4.4.0 4.4.0 .................. SUCCESS [  6.102 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 37.160 s
[INFO] Finished at: 2019-02-17T22:26:28+08:00
[INFO] ------------------------------------------------------------------------

进入构建后的输出目录,按顺序依次启动Name ServerBroker

cd distribution/target/apache-rocketmq

启动Name Server,nohup表示不挂断执行,terminal退出时MQ的进程不会退出

nohup sh bin/mqnamesrv &

查看Name Server 启动日志

tail -f ~/logs/rocketmqlogs/namesrv.log

启动成功

2019-02-23 22:38:09 INFO main - tls.client.keyPath = null
2019-02-23 22:38:09 INFO main - tls.client.keyPassword = null
2019-02-23 22:38:09 INFO main - tls.client.certPath = null
2019-02-23 22:38:09 INFO main - tls.client.authServer = false
2019-02-23 22:38:09 INFO main - tls.client.trustCertPath = null
2019-02-23 22:38:09 INFO main - Using OpenSSL provider
2019-02-23 22:38:10 INFO main - SSLContext created for server
2019-02-23 22:38:10 INFO NettyEventExecutor - NettyEventExecutor service started
2019-02-23 22:38:10 INFO FileWatchService - FileWatchService service started
2019-02-23 22:38:10 INFO main - The Name Server boot success. serializeType=JSON

启动Broker,并指定注册到Name Server的地址 127.0.0.1:9786

nohup sh bin/mqbroker -n 127.0.0.1:9876 &

查看Broker 启动日志

tail -f ~/logs/rocketmqlogs/broker.log 

启动成功

2019-02-23 22:46:18 WARN main - Load default discard message hook service: DefaultTransactionalMessageCheckListener
2019-02-23 22:46:18 INFO main - The broker dose not enable acl
2019-02-23 22:46:18 INFO FileWatchService - FileWatchService service started
2019-02-23 22:46:18 INFO PullRequestHoldService - PullRequestHoldService service started
2019-02-23 22:46:18 INFO brokerOutApi_thread_1 - register broker to name server 127.0.0.1:9876 OK
2019-02-23 22:46:18 INFO main - Start transaction service!
2019-02-23 22:46:18 INFO main - The broker[zhangjianweideMacBook-Pro.local, 192.168.1.113:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
2019-02-23 22:46:28 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-02-23 22:46:28 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 415169 bytes
2019-02-23 22:46:28 INFO brokerOutApi_thread_2 - register broker to name server 127.0.0.1:9876 OK

至此,RocketMQ的服务端已全部启动运行了,下面我们将通过源码中的示例验证,让Producer发送消息,Consumer来消费消息。

运行示例代码

接下来我们就可以运行示例代码,简单验证一下服务端是否启动并开始工作。

设置Name Server 地址到环境变量,Producer和Consumer工作时都会用到此地址

export NAMESRV_ADDR=127.0.0.1:9876

启动客户端,验证发送与接收消息,producer和consumer的启动没有先后顺序。

启动Producer,Producer启动后将发送多条Topic为“TopicTest”的消息,然后自动终止进程

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360003E3, offsetMsgId=C0A8017100002A9F00000000000BCFA1, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=0], queueOffset=998]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360103E4, offsetMsgId=C0A8017100002A9F00000000000BD055, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=1], queueOffset=1001]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360203E5, offsetMsgId=C0A8017100002A9F00000000000BD109, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=2], queueOffset=999]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360303E6, offsetMsgId=C0A8017100002A9F00000000000BD1BD, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=3], queueOffset=999]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360403E7, offsetMsgId=C0A8017100002A9F00000000000BD271, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=0], queueOffset=999]
......

启动Consumer,Consumer启动后将订阅Topic名称为“TopicTest”消息,持续监听,收到消息后进行消费,将消息打印输出

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=983, sysFlag=0, bornTimestamp=1550936154570, bornHost=/192.168.1.113:56251, storeTimestamp=1550936154571, storeHost=/192.168.1.113:10911, msgId=C0A8017100002A9F00000000000BA571, commitLogOffset=763249, bodyCRC=1431313338, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1000, CONSUME_START_TIME=1550936205508, UNIQ_KEY=C0A8017107605E2DE80C765C35CA03A7, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 51, 53], transactionId='null'}]]
ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=995, sysFlag=0, bornTimestamp=1550936154615, bornHost=/192.168.1.113:56251, storeTimestamp=1550936154616, storeHost=/192.168.1.113:10911, msgId=C0A8017100002A9F00000000000BC5C9, commitLogOffset=771529, bodyCRC=835257960, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1000, CONSUME_START_TIME=1550936205510, UNIQ_KEY=C0A8017107605E2DE80C765C35F703D5, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 49], transactionId='null'}]]
ConsumeMessageThread_18 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=994, sysFlag=0, bornTimestamp=1550936154613, bornHost=/192.168.1.113:56251, storeTimestamp=1550936154614, storeHost=/192.168.1.113:10911, msgId=C0A8017100002A9F00000000000BC2F9, commitLogOffset=770809, bodyCRC=1597161362, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1000, CONSUME_START_TIME=1550936205510, UNIQ_KEY=C0A8017107605E2DE80C765C35F503D1, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 55], transactionId='null'}]]
......

以上就是示例代码的演示

关闭服务端:进入安装目录中,依次关闭broker、namesrv,如果忘记关闭,下次启动时会提示端口冲突。

cd distribution/target/apache-rocketmq
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

小结

本文介绍了RocketMQ架构的主要轮廓,讲解如何安装环境、运行示例代码。

相关文章

  • RocketMQ快速上手

    系统要求 64bit 的 Linux、 Unix 或 Mac (Windows 也支持) 64bit JDK 1....

  • 快速上手RocketMQ

    本文介绍如何安装配置单机版的RocketMQ,以及简单地收发消息。读者也可以参考RocketMQ官网的说明文档。 ...

  • RocketMQ快速上手

    RocketMQ 是阿里开源的一款高性能、高吞吐量的消息中间件。 RocketMQ 架构介绍 RocketMQ架构...

  • RocketMQ 实战之快速入门

    最近 RocketMQ 刚刚上生产环境,闲暇之时在这里做一些分享,主要目的是让初学者能快速上手RocketMQ。 ...

  • Masonry介绍与使用实践:快速上手Autolayout

    Masonry介绍与使用实践:快速上手Autolayout Masonry介绍与使用实践:快速上手Autolayout

  • RocketMQ:消息发送与消费

    在此之前,我们已经介绍过《RocketMQ:快速入门》和《RocketMQ:搭建集群》。现在我们已经准备好Rock...

  • 2、RocketMQ基础-RocketMQ快速入门

    RocketMQ快速入门 RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,在阿里内部,Roc...

  • RcoketMQ GUI 可视化管理工具

    优秀的 RocketMQ 可视化管理工具 GUI 客户端 快速查看所有 RocketMQ 集群,包括Brokers...

  • Spring Boot-web开发详解

    之前有一篇文章介绍了如何快速上手Spring Boot:如何快速上手Spring Boot?,方便大家快速入门、了...

  • Mac安装RockMQ

    本文仅以rocketmq4.4.0作为展示样例 rocketmq官方快速开发文档 一、4.4.0环境要求 64bi...

网友评论

      本文标题:RocketMQ快速上手

      本文链接:https://www.haomeiwen.com/subject/ihfqpqtx.html