美文网首页
RocketMQ-基础使用(一)

RocketMQ-基础使用(一)

作者: 石头耳东 | 来源:发表于2022-05-28 10:23 被阅读0次

    零、本文纲要

    一、RocketMQ基础

    1. MQ特点
    2. RocketMQ安装
    3. 测试RocketMQ

    二、RocketMQ集群搭建

    1. 角色
    2. 集群特点
    3. 集群模式
    4. 双Master双Slave模式-同步双写

    三、mqadmin管理工具

    四、RocketMQ监控平台

    一、RocketMQ基础

    1. MQ特点

    优点: 应用解耦 / 流量削峰 / 数据分发
    缺点: 可用性降低 / 复杂度提高 / 一致性问题

    特性 ActiveMQ RabbitMQ RocketMQ Kafka
    开发语言 Java Erlang Java Scala
    单机吞吐量 万级 万级 10万级 10万级
    时效性 ms us ms ms以内
    可用性 高(主从) 高(主从) 非常高(分布式架构) 非常高(分布式架构)
    功能特性 成熟的产品,有较多的文档,各种协议支持较好。 基于Erlang开发,并发能力很强,性能极好,延时性低,管理界面友好。 MQ功能比较完善,扩展性佳。 只支持主要的MQ功能,像一些消息查询、消息回溯等功能没有,为了大数据准备的。

    2. RocketMQ安装

    RocketMQ快速入门官方文档

    • ① 下载安装包/解压

    官方下载网址,版本为4.9.3
    解压到指定目录下:unzip rocketmq-all-4.9.3-bin-release.zip -d /usr/local

    • ② 启动 NameServer

    后台启动:nohup sh bin/mqnamesrv &
    查看启动日志:tail -f ~/logs/rocketmqlogs/namesrv.log

    注意:如果遇到nohup: 忽略输入并把输出追加到"nohup.out"表示默认输出内容到当前目录下的nohup.out文件中【并不是报错】。

    也可以自定义,将输出日志到我们指定的文件,如下:
    先创建目录:mkdir -p /root/logs/rocketmqlogs/
    再创建文件:touch /root/logs/rocketmqlogs/namesrv.log
    最后再使用nohup sh bin/mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &命令。
    (此处是RocketMQ启动运行即可生成的日志文件,仅作示范用。)

    补充 Linux 命令内容
    nohup Command [ Arg … ] [ & ]
    Command:要执行的命令;
    Arg:一些参数,可以指定输出文件;
    &:让命令在后台执行,终端退出后命令仍旧执行。
    解读我们的命令:nohup sh bin/mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &
    第一处>表示标准输出重定向,此处省略了前面的 1;
    ~/logs/rocketmqlogs/namesrv.log表示重定向的目标文件;
    2>&1表示将标准错误 2 重定向到标准输出 &1;
    最后标准输出 &1 再被重定向输入到 namesrv.log 文件中。

    • ③ 启动 Broker

    后台启动:nohup sh bin/mqbroker -n localhost:9876 &
    查看启动日志:tail -f ~/logs/rocketmqlogs/broker.log

    注意:

    runbroker.sh默认JVM环境配置内存较大,此处可以按需做调整,默认配置如下:

    # 修改前
    JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
    
    # 修改后
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
    

    runserver.sh默认JVM环境配置内存也较大,此处可以按需做调整,默认配置如下:

    # 修改前
    JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    
    # 修改后
    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    
    • ④ 关闭RocketMQ

    关闭 NameServer:sh bin/mqshutdown namesrv
    关闭 Broker:sh bin/mqshutdown broker

    3. 测试RocketMQ

    窗口一:

    export NAMESRV_ADDR=localhost:9876
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
    

    窗口二:

    export NAMESRV_ADDR=localhost:9876
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
    

    关闭:

    sh bin/mqshutdown broker
    sh bin/mqshutdown namesrv
    

    二、RocketMQ集群搭建

    1. 角色

    Producer(消息发送者) / Consumer(消息接收者)
    Broker(暂存和传输消息) / NameServer(管理Broker) / Topic(区分消息的种类) / Message Queue(Topic的分区,存储消息的物理地址)

    2. 集群特点

    • ① NameServer

    无状态节点,直接启动多个即为集群,节点间无需信息同步;

    • ② Broker

    分主从,一主多从,不可一从多主;
    主从关系指定使用相同的BrokerName,不同BrokerId,0表示主,非0为从;
    每个Broker节点与NameServer集群所有节点建立长连接,定时注册Topic到所有NameServer;

    • ③ Producer

    无状态,直接启动多个即为集群,节点间无需信息同步;
    与NameServer集群中的一个节点(随机)建立长连接,定期从NameServer取Topic路由信息;
    向提供Topic服务的Master建立长连接,定时发送心跳;

    • ④ Consumer

    与NameServer集群中的一个节点(随机)建立长连接,定期从NameServer取Topic路由信息;
    向提供Topic服务的Master、Slave建立长连接,定时发送心跳;
    既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定;

    3. 集群模式

    单Master模式 / 多Master模式 / 多Master多Slave模式(同步/异步)

    4. 双Master双Slave模式-同步双写

    集群搭建官方文档

    • ① 环境准备
    序号 IP-Addr 角色 架构模式
    1 192.168.253.128 nameserver、brokerserver Master1、Slave2
    2 192.168.253.130 nameserver、brokerserver Master2、Slave1

    主机1:broker-a的master、broker-b的slave
    主机2:broker-b的master、broker-a的slave

    • ② 配置Host

    命令:vim /etc/hosts

    # nameserver
    192.168.253.128 rocketmq-nameserver1
    192.168.253.130 rocketmq-nameserver2
    # broker
    192.168.253.128 rocketmq-master1
    192.168.253.128 rocketmq-slave2
    192.168.253.130 rocketmq-master2
    192.168.253.130 rocketmq-slave1
    

    重启网卡:systemctl restart network

    • ③ 开放防火墙
    # 开放name server默认端口
    firewall-cmd --remove-port=9876/tcp --permanent
    # 开放master默认端口
    firewall-cmd --remove-port=10911/tcp --permanent
    # 开放slave默认端口 (当前集群模式可不开启)
    firewall-cmd --remove-port=11011/tcp --permanent 
    # 重启防火墙
    firewall-cmd --reload
    
    • ④ 配置环境变量

    命令:vim /etc/profile

    #set rocketmq
    ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq-all-4.4.0-bin-release
    PATH=$PATH:$ROCKETMQ_HOME/bin
    export ROCKETMQ_HOME PATH
    

    重新加载使之生效:source /etc/profile

    • ⑤ 自定义消息存储路径
    mkdir -p /usr/local/rocketmq-all-4.4.0-bin-release/store
    mkdir -p /usr/local/rocketmq-all-4.4.0-bin-release/store/commitlog
    mkdir -p /usr/local/rocketmq-all-4.4.0-bin-release/store/consumequeue
    mkdir -p /usr/local/rocketmq-all-4.4.0-bin-release/store/index
    
    • ⑥ 编写Broker配置文件

    通过查看conf/目录不难看出官方默认提供了各种类型的基础配置文件,可以直接使用。如下:

    ├── conf
        ├── 2m-2s-async     #双主双从(异步)
        ├── 2m-2s-sync  #双主双从(同步)
        ├── 2m-noslave  #双主
    

    Ⅰ master的配置文件

    注意修改brokerName、listenPort
    主机1:broker-a的master、broker-b的slave
    主机2:broker-b的master、broker-a的slave

    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=0
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq-all-4.4.0-bin-release/store
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq-all-4.4.0-bin-release/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq-all-4.4.0-bin-release/store/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq-all-4.4.0-bin-release/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq-all-4.4.0-bin-release/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq-all-4.4.0-bin-release/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SYNC_MASTER
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=SYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    

    Ⅱ broker的配置文件

    注意修改brokerName、listenPort
    主机1:broker-a的master、broker-b的slave
    主机2:broker-b的master、broker-a的slave

    #所属集群名字
    brokerClusterName=rocketmq-cluster
    #broker名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-b
    #0 表示 Master,>0 表示 Slave
    brokerId=1
    #nameServer地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=11011
    #删除文件时间点,默认凌晨 4点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq-all-4.4.0-bin-release/store
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq-all-4.4.0-bin-release/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq-all-4.4.0-bin-release/store/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq-all-4.4.0-bin-release/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq-all-4.4.0-bin-release/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq-all-4.4.0-bin-release/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制Master
    #- SYNC_MASTER 同步双写Master
    #- SLAVE
    brokerRole=SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    
    • ⑦ 启动集群服务

    Ⅰ 启动NameServer集群

    分别启动两个主机的NameServer服务,如下:

    cd /usr/local/rocketmq-all-4.4.0-bin-release/bin
    nohup sh mqnamesrv &
    

    Ⅱ 启动Broker集群

    分别启动两个主机的BrokerServer服务,如下:

    cd /usr/local/rocketmq-all-4.4.0-bin-release/bin
    nohup sh mqbroker -c /usr/local/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a.properties &
    
    cd /usr/local/rocketmq-all-4.4.0-bin-release/bin
    nohup sh mqbroker -c /usr/local/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b-s.properties &
    
    cd /usr/local/rocketmq-all-4.4.0-bin-release/bin
    nohup sh mqbroker -c /usr/local/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-b.properties &
    
    cd /usr/local/rocketmq-all-4.4.0-bin-release/bin
    nohup sh mqbroker -c /usr/local/rocketmq-all-4.4.0-bin-release/conf/2m-2s-sync/broker-a-s.properties &
    

    三、mqadmin管理工具

    执行命令方法:./mqadmin {command} {args}

    具体的内容可见官方文档
    由于日常比较少使用,可以使用的时候再查询文档。另外官方也提供了控制台工具,后文会介绍。

    四、RocketMQ监控平台

    官方拓展开源项目网址,在对应网址内下载rocketmq-console/rocketmq-dashboard。

    rocketmq-dashboard开源项目网址,下载开源项目代码,如下:

    下载开源项目代码.png

    然后使用mvn工具打包如下:

    mvn clean package -Dmaven.test.skip=true
    
    使用指令打包监控工具.png

    然后上传至对应主机运行,如下:

    传递jar包.png

    运行jar包,如下:

    java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar
    

    访问RocketMQ-Dashboard,默认为http://192.168.253.128:8080

    Dashboard监控平台页面.png

    五、结尾

    以上即为RocketMQ-基础使用(一)的基础内容,感谢阅读。

    相关文章

      网友评论

          本文标题:RocketMQ-基础使用(一)

          本文链接:https://www.haomeiwen.com/subject/bhuvprtx.html