美文网首页分布式专题
RocketMQ(一) 初识RocketMQ

RocketMQ(一) 初识RocketMQ

作者: TiaNa_na | 来源:发表于2019-10-09 14:54 被阅读0次

    目录
    1 MQ
      1.1 MQ简介
      1.2 MQ的缺点
      1.3 主流消息队列比较
    2 RocketMQ入门
      2.1 环境搭建
      2.2 安装和启动
      2.3 目录介绍
      2.4 控制台安装
    3 RocketMQ集群搭建
      3.1 RocketMQ集群
      3.2 集群搭建方式
       3.2.1 集群特点
       3.2.2 集群模式
      3.3 双主双从集群搭建
       3.3.1总体架构
       3.3.2 集群工作流程
       3.3.3 服务器环境
       3.3.4 添加Host信息
       3.3.5 防火墙配置
       3.3.6 环境变量配置
       3.3.7 创建消息存储路径
       3.3.8 修改JVM大小
       3.3.9 修改broker配置文件
       3.3.10 服务启动
       3.3.11查看进程状态
       3.3.12 查看日志
      3.4 mqadmin管理工具
      3.5 集群监控平台搭建

    1 MQ(Message Quene)

    1.1 MQ简介

    消息队列是一种先进先出的数据结构。


    其应用场景主要包含:

    • 应用解耦
      系统耦合性越高,容错性越低。以用户下单购买业务为例。传统的做法是,订单系统调用库存系统的接口。一旦库存系统出现故障或因为升级等原因暂时不可用,都会造成下单操作异常。



      引入消息队列后, 假如在下单时库存系统不能正常使用,也不影响正常下单。当库存系统恢复正常后,补充处理消息队列中的订单消息即可。实现订单系统与库存系统的应用解耦。


    • 流量削峰
      秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,需要在应用前端加入消息队列。服务器接收用户的请求后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息,再做后续处理。


    • 数据分发



      通过消息队列可以让数据在多个系统间流通,数据的产生方不需要关心谁来使用数据,只要将数据发送到消息队列,数据使用方直接获取数据即可。


    1.2 MQ的缺点
    • 系统可用性降低

      系统引入的外部依赖越多,系统稳定性越差,一旦MQ宕机,就会对业务造成影响。

      如何保证MQ的高可用?

    • 系统复杂度提高

      以前系统是同步的远程调用,现在是通过MQ异步调用。

      如何保证消息没有被重复消费?怎么处理消息丢失情况?如何保证消息传递的顺序性?

    • 一致性问题

      A系统处理完业务,通过MQ给给B、C、D系统发消息,如果B系统、C系统处理成功、D系统处理失败。

      如何保证消息数据处理的一致性?

    1.3 主流消息队列比较

    apache rocketmq官方文档上有关于主流消息队列的比较:主流消息队列比较

    2 RocketMQ入门

    RocketMQ是阿里巴巴在2012年开源的第三代分布式消息中间件,2016年捐赠给Apache。作为一款纯java、分布式、队列模型的开源消息中间件,它支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。

    2.1 环境准备
    2.2 目录介绍
    • bin:启动脚本,包括shell脚本和CMD脚本。

    • conf:实例配置文件,包括broker配置文件、logback配置文件等。

      • -- 2m-noslave 双主
      • -- 2m-2s-sync 双主双从同步复制
      • -- 2m-2s-async 双主双从异步复制
    • lib:依赖jar包,包括Netty、commons-lang、FastJson等。

    2.3 安装和启动

    官网
    需要下载的文件有:


    带source的源码需要自己编译,带bin的二进制文件不需编译可直接使用。
    • 服务器环境
    IP 角色
    192.168.217.130 nameserver、brokerserver
    • 我使用的源码安装方法,进入压缩文件目录解压并编译
    #解压
    unzip rocketmq-all-4.4.0-source-release.zip
    cd cd rocketmq-all-4.4.0/
    # 编译
    mvn -Prelease-all -DskipTests clean install -U
    # 移到编译好的文件到/usr/local/目录下
    cd distribution/target/
    mv apache-rocketmq /usr/local/rocketmq
    
    • 添加hosts信息
    vim /etc/hosts
    

    配置如下:

    192.168.217.130 rocketmqNameserver1
    192.168.217.130 rocketmqMaster1
    

    配置完成后, 重启网卡

    systemctl restart network
    
    • RocketMQ对内存要求比较高,我们需要按实际情况修改默认JVM大小
    cd /usr/local/rocketmq
    # 编辑bin目录下的runbroker.sh和runserver.sh
    vi runbroker.sh 
    vi runserver.sh
    
    # runserver.sh修改前
    JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    # runserver.sh修改后
    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    
    # runbroker.sh修改前
    JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
    # runbroker.sh修改后
    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g"
    
    • 在bin目录下启动NameServer
    # 启动NameServer 
    > nohup sh bin/mqnamesrv &
    # 查看启动日志
    > tail -f ~/logs/rocketmqlogs/namesrv.log
    The Name Server boot success...
    
    • 在bin目录下启动Broker
    # 启动Broker
    >nohup sh bin/mqbroker -n localhost:9876 &
    # 查看启动日志
    >tail -f ~/logs/rocketmqlogs/broker.log 
    The broker boot success...
    # 查看启动进程
    > jps
    1875 Jps
    1782 NamesrvStartup
    1817 BrokerStartup
    
    • 关闭RocketMQ
    # 关闭broker
    > sh bin/mqshutdown broker
    # 关闭nameServer
    > sh bin/mqshutdown namesrv
    
    2.4 控制台安装

    RocketMQ 有一个对其扩展的开源项目incubator-rocketmq-externals,这个项目中有一 个子模块叫 rocketmq‐console ,这个便是管理控制台项目了。
    下载解压后:


    我们可以用idea打开此项目,修改application.properties,配置服务器的ip地址

    启动项目,浏览http://localhost:8080

    3 RocketMQ集群搭建

    3.1 RocketMQ集群

    RocketMQ消息队列的整体部署架构如下图所示:


    通信过程:
    (1) Broker启动后需要完成一次将自己注册至NameServer的操作;随后每隔30s时间定期向NameServer上报Topic路由信息;
    (2) 消息生产者Producer作为客户端发送消息时候,需要根据Msg的Topic从本地缓存的TopicPublishInfoTable获取路由信息。如果没有则更新路由信息会从NameServer上重新拉取;
    (3) 消息生产者Producer根据(2)中获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker作为消息的接收者收消息并落盘存储;
    各角色介绍:
    (1) NameServer:是一个几乎无状态节点,可集群部署,在消息队列 MQ 中提供命名服务,更新和发现 Broker 服务。
    (2) Broker:消息中转角色,负责存储消息,转发消息。分为 Master Broker 和 Slave Broker,一个 Master Broker 可以对应多个 Slave Broker,但是一个 Slave Broker 只能对应一个 Master Broker。Broker 启动后需要完成一次将自己注册至 Name Server 的操作;随后每隔 30s 定期向 Name Server 上报 Topic 路由信息。
    Broker中还存在一些非常重要的名词需要说明:
    ①Topic和Queue
    RocketMQ中Topic只代表普通的消息队列,而Queue是组成Topic的更小单元,集群消费模式下一个消费者只消费该Topic中部分Queue中的消息,当一个消费者开启广播模式时则会消费该Topic下所有Queue中的消息。
    Topic和Queue的关系
    具体可参考https://blog.csdn.net/binzhaomobile/article/details/73332463
    ②Tag
    Topic 和 Tag 的定义如下:
    Topic:消息主题,通过 Topic 对不同的业务消息进行分类。
    Tag:消息标签,用来进一步区分某个 Topic 下的消息分类,消息从生产者发出即带上的属性。

    通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息。
    Topic 与 Tag 最佳实践
    (3) Producer:与 Name Server 集群中的其中一个节点(随机)建立长链接(Keep-alive),定期从 Name Server 读取 Topic 路由信息,并向提供 Topic 服务的 Master Broker 建立长链接,且定时向 Master Broker 发送心跳。
    (4) Consumer:与 Name Server 集群中的其中一个节点(随机)建立长连接,定期从 Name Server 拉取 Topic 路由信息,并向提供 Topic 服务的 Master Broker、Slave Broker 建立长连接,且定时向 Master Broker、Slave Broker 发送心跳。Consumer 既可以从 Master Broker 订阅消息,也可以从 Slave Broker 订阅消息,订阅规则由 Broker 配置决定。
    3.2 集群搭建方式
    3.2.1 集群特点
    • NameServer是一个几乎无状态节点,节点之间不需要信息同步。每新加入一个NameServer,Broker会自动上报信息。
    • Producer完全无状态,不需要数据同步。
    • Consumer不需要数据同步。
    • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个 Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相 同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示 Slave。Master也可以部署多个。
    3.2.2 集群模式

    1)单Master模式
    这种方式风险较大,一旦Broker重启或者宕机时,会导致整个服务不可用。不建议线上 环境使用,可以用于本地测试。
    2)多Master模式
    一个集群无Slave,全是Master,例如2个Master或者3个Master,这种模式的优缺点如 下:

    • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10 时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异 步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
    • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消 息实时性会受到影响。
      3)多Master多Slave模式(异步)
      每个Master配置一个Slave,有多对Master-Slave,HA采用异步复制方式,主备有短暂 消息延迟(毫秒级),这种模式的优缺点如下:
    • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master 宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预, 性能同多Master模式几乎一样;
    • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
      4)多Master多Slave模式(同步)
      每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备 都写成功,才向应用返回成功,这种模式的优缺点如下: * 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与 数据可用性都非常高;
    • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高, 且目前版本在主节点宕机后,备机不能自动切换为主机。
    3.3 双主双从集群搭建
    3.3.1 总体架构

    消息高可用采用2m-2s(同步双写)方式


    3.3.2 集群工作流程
    1. 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、 Consumer连上来,相当于一个路由控制中心。
    2. Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当 前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群 中就有Topic跟Broker的映射关系。
    3. 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上, 也可以在发送消息时自动创建Topic。
    4. Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从 NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一 个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。
    5. Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅 Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
    3.3.3 服务器环境
    IP 角色 架构模式
    192.168.217.130 nameserver、brokerserver Master1
    192.168.217.132 nameserver、brokerserver Master2
    192.168.217.133 nameserver、brokerserver Slave1
    192.168.217.134 nameserver、brokerserver Slave2
    3.3.4 添加Host信息(四台服务器都要配置)
    vim /etc/hosts
    

    配置如下:

    # nameserver
    192.168.217.130 rocketmq‐nameserver1
    192.168.217.132 rocketmq‐nameserver2
    192.168.217.133 rocketmq‐nameserver3
    192.168.217.134 rocketmq‐nameserver4
    # broker
    192.168.217.130 rocketmq‐master1
    192.168.217.132 rocketmq‐master2
    192.168.217.133 rocketmq‐slave1
    192.168.217.134 rocketmq‐slave2
    

    配置完成后, 重启网卡

    systemctl restart network
    
    3.3.5 防火墙配置 (四台服务器都要配置)

    宿主机需要远程访问虚拟机的rocketmq服务和web服务,需要开放相关的端口号,简单 粗暴的方式是直接关闭防火墙

    # 安装 firewall-cmd 和 systemctl 命令
    yum install firewalld systemd -y
    # 关闭防火墙 
    systemctl stop firewalld.service
    # 查看防火墙的状态 
    firewall-cmd --state 
    # 禁止firewall开机启动 
    systemctl disable firewalld.service
    

    或者为了安全,只开放特定的端口号,RocketMQ默认使用3个端口:9876 、10911 、 11011 。如果防火墙没有关闭的话,那么防火墙就必须开放这些端口:

    • nameserver 默认使用 9876 端口
    • master 默认使用 10911 端口
    • slave 默认使用11011 端口
      执行以下命令:
    # 开放name server默认端口 
    firewall-cmd --remove-port=9876/tcp --permanent 
    # 开放master默认端口 
    firewall-cmd --remove-port=10911/tcp --permanent 
    # 开放slave默认端口 (当前集群模式可不开启) 
    firewall-cmd --remove‐port=11011/tcp --permanent 
    # 重启防火墙 
    firewall-cmd --reload
    
    3.3.6 环境变量配置(四台台服务器都要配置)
    vim /etc/profile
    

    在profile文件的末尾加入如下命令

    #set rocketmq 
    ROCKETMQ_HOME=/usr/local/rocketmq
    PATH=$PATH:$ROCKETMQ_HOME/bin 
    export ROCKETMQ_HOME PATH 
    

    输入:wq! 保存并退出, 并使得配置立刻生效:

    source /etc/profile
    
    3.3.7 创建消息存储路径(四台服务器都要配置)
    [root@rocketmq‐nameserver1 local]# cd rocketmq/
    [root@rocketmq‐nameserver1 rocketmq]# mkdir logs
    [root@rocketmq‐nameserver1 rocketmq]# mkdir store
    [root@rocketmq‐nameserver1 rocketmq]# cd store/
    [root@rocketmq‐nameserver1 store]# mkdir commitlog
    [root@rocketmq‐nameserver1 store]# mkdir consumequene
    [root@rocketmq‐nameserver1 store]# mkdir index
    

    进入conf目录,使用命令替换所有xml文件中的${user.home},保证日志路径正确

    sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
    
    3.3.8 修改默认JVM大小
    # 编辑bin目录下的runbroker.sh和runserver.sh
    vi runbroker.sh 
    vi runserver.sh
    
    # runserver.sh修改前
    JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    # runserver.sh修改后
    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    
    # runbroker.sh修改前
    JAVA_OPT="${JAVA_OPT} -server -Xms8g -Xmx8g -Xmn4g"
    # runbroker.sh修改后
    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn1g"
    
    3.3.9 broker配置文件

    1)master1
    服务器192.168.217.130
    此时通过jps命令查看RocketMQ是否正在运行,如果运行中需要关闭它,关闭命令参考2.2

    vi /usr/local/rocketmq/conf/2m‐2s‐sync/broker‐a.properties
    

    修改配置如下:

    #所属集群名字 
    brokerClusterName=rocketmq‐cluster 
    #broker名字,注意此处不同的配置文件填写的不一样 
    brokerName=broker‐a 
    #0 表示 Master,>0 表示 Slave 
    brokerId=0 
    #nameServer地址,分号分割 
    namesrvAddr=rocketmq‐nameserver1:9876;rocketmq‐nameserver2:9876;rocketmq‐nameserver3:9876 
    ;rocketmq‐nameserver4:9876 
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
    defaultTopicQueueNums=4 
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 
    autoCreateTopicEnable=true 
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 
    autoCreateSubscriptionGroup=true 
    #Broker 对外服务的监听端口 
    listenPort=10911 
    #删除文件时间点,默认凌晨 4点 
    deleteWhen=04 
    #文件保留时间,默认 48 小时 
    fileReservedTime=120 
    #commitLog每个文件的大小默认1G 
    mapedFileSizeCommitLog=1073741824 
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整 
    mapedFileSizeConsumeQueue=300000 
    #destroyMapedFileIntervalForcibly=120000 
    #redeleteHangedFileInterval=120000 
    #检测物理文件磁盘空间 
    diskMaxUsedSpaceRatio=88 
    #存储路径 
    storePathRootDir=/usr/local/rocketmq/store 
    #commitLog 存储路径 
    storePathCommitLog=/usr/local/rocketmq/store/commitlog 
    #消费队列存储路径存储路径 
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue 
    #消息索引存储路径 
    storePathIndex=/usr/local/rocketmq/store/index 
    #checkpoint 文件存储路径 
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint 
    #abort 文件存储路径 
    abortFile=/usr/local/rocketmq/store/abort
    #限制的消息大小 
    maxMessageSize=65536 
    #flushCommitLogLeastPages=4 
    #flushConsumeQueueLeastPages=2 
    #flushCommitLogThoroughInterval=10000 
    #flushConsumeQueueThoroughInterval=60000 
    #Broker 的角色 
    #‐ ASYNC_MASTER 异步复制Master 
    #‐ SYNC_MASTER 同步双写Master 
    #‐ SLAVE 
    brokerRole=SYNC_MASTER 
    #刷盘方式 
    #‐ ASYNC_FLUSH 异步刷盘 
    #‐ SYNC_FLUSH 同步刷盘 
    flushDiskType=ASYNC_FLUSH 
    #checkTransactionMessageEnable=false 
    #发消息线程池数量 
    #sendMessageThreadPoolNums=128 
    #拉消息线程池数量 
    #pullMessageThreadPoolNums=128
    

    2)master2
    服务器192.168.217.132

    vi /usr/local/rocketmq/conf/2m‐2s‐sync/broker‐b.properties
    

    修改配置如下:

    #所属集群名字 
    brokerClusterName=rocketmq‐cluster 
    #broker名字,注意此处不同的配置文件填写的不一样 
    brokerName=broker‐b 
    #0 表示 Master,>0 表示 Slave 
    brokerId=0 #nameServer地址,分号分割 
    namesrvAddr=rocketmq‐nameserver1:9876;rocketmq‐nameserver2:9876;rocketmq‐nameserver3:9876 
    ;rocketmq‐nameserver4:9876 
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
    defaultTopicQueueNums=4 
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 
    autoCreateTopicEnable=true 
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 
    autoCreateSubscriptionGroup=true 
    #Broker 对外服务的监听端口
    listenPort=10911 
    #删除文件时间点,默认凌晨 4点 
    deleteWhen=04 
    #文件保留时间,默认 48 小时 
    fileReservedTime=120 
    #commitLog每个文件的大小默认1G 
    mapedFileSizeCommitLog=1073741824 
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整 
    mapedFileSizeConsumeQueue=300000 
    #destroyMapedFileIntervalForcibly=120000 
    #redeleteHangedFileInterval=120000 
    #检测物理文件磁盘空间 
    diskMaxUsedSpaceRatio=88 
    #存储路径 
    storePathRootDir=/usr/local/rocketmq/store 
    #commitLog 存储路径 
    storePathCommitLog=/usr/local/rocketmq/store/commitlog 
    #消费队列存储路径存储路径 
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue 
    #消息索引存储路径 
    storePathIndex=/usr/local/rocketmq/store/index 
    #checkpoint 文件存储路径 
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint 
    #abort 文件存储路径 
    abortFile=/usr/local/rocketmq/store/abort
    #限制的消息大小 
    maxMessageSize=65536 
    #flushCommitLogLeastPages=4 
    #flushConsumeQueueLeastPages=2 
    #flushCommitLogThoroughInterval=10000 
    #flushConsumeQueueThoroughInterval=60000 
    #Broker 的角色 
    #‐ ASYNC_MASTER 异步复制Master 
    #‐ SYNC_MASTER 同步双写Master 
    #‐ SLAVE 
    brokerRole=SYNC_MASTER 
    #刷盘方式 
    #‐ ASYNC_FLUSH 异步刷盘 
    #‐ SYNC_FLUSH 同步刷盘 
    flushDiskType=ASYNC_FLUSH 
    #checkTransactionMessageEnable=false 
    #发消息线程池数量 
    #sendMessageThreadPoolNums=128 
    #拉消息线程池数量 
    #pullMessageThreadPoolNums=128
    

    3)slave1
    服务器192.168.217.133

    vi /usr/local/rocketmq/conf/2m‐2s‐sync/broker‐a‐s.properties
    

    修改配置如下:

    #所属集群名字 
    brokerClusterName=rocketmq‐cluster 
    #broker名字,注意此处不同的配置文件填写的不一样 
    brokerName=broker‐a 
    #0 表示 Master,>0 表示 Slave 
    brokerId=1 
    #nameServer地址,分号分割 
    namesrvAddr=rocketmq‐nameserver1:9876;rocketmq‐nameserver2:9876;rocketmq‐nameserver3:9876 
    ;rocketmq‐nameserver4:9876 
    #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
    defaultTopicQueueNums=4 
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 
    autoCreateTopicEnable=true 
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 
    autoCreateSubscriptionGroup=true 
    #Broker 对外服务的监听端口 
    listenPort=11011 
    #删除文件时间点,默认凌晨 4点 
    deleteWhen=04 
    #文件保留时间,默认 48 小时 
    fileReservedTime=120 
    #commitLog每个文件的大小默认1G 
    mapedFileSizeCommitLog=1073741824 
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整 
    mapedFileSizeConsumeQueue=300000 
    #destroyMapedFileIntervalForcibly=120000 
    #redeleteHangedFileInterval=120000 
    #检测物理文件磁盘空间 
    diskMaxUsedSpaceRatio=88 
    #存储路径 
    storePathRootDir=/usr/local/rocketmq/store
    #commitLog 存储路径 
    storePathCommitLog=/usr/local/rocketmq/store/commitlog 
    #消费队列存储路径存储路径 
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue 
    #消息索引存储路径 
    storePathIndex=/usr/local/rocketmq/store/index 
    #checkpoint 文件存储路径 
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint 
    #abort 文件存储路径 
    abortFile=/usr/local/rocketmq/store/abort
    #限制的消息大小 
    maxMessageSize=65536 
    #flushCommitLogLeastPages=4 
    #flushConsumeQueueLeastPages=2 
    #flushCommitLogThoroughInterval=10000 
    #flushConsumeQueueThoroughInterval=60000 
    #Broker 的角色 
    #‐ ASYNC_MASTER 异步复制Master 
    #‐ SYNC_MASTER 同步双写Master 
    #‐ SLAVE 
    brokerRole=SLAVE #刷盘方式 
    #‐ ASYNC_FLUSH 异步刷盘 
    #‐ SYNC_FLUSH 同步刷盘 
    flushDiskType=ASYNC_FLUSH 
    #checkTransactionMessageEnable=false 
    #发消息线程池数量 
    #sendMessageThreadPoolNums=128 
    #拉消息线程池数量 
    #pullMessageThreadPoolNums=128
    

    4)slave2
    服务器192.168.217.134

    vi /usr/local/rocketmq/conf/2m‐2s‐sync/broker‐b‐s.properties
    

    修改配置如下:

    #所属集群名字 
    brokerClusterName=rocketmq‐cluster 
    #broker名字,注意此处不同的配置文件填写的不一样 
    brokerName=broker‐b 
    #0 表示 Master,>0 表示 Slave 
    brokerId=1 
    #nameServer地址,分号分割 
    namesrvAddr=rocketmq‐nameserver1:9876;rocketmq‐nameserver2:9876;rocketmq‐nameserver3:9876 
    ;rocketmq‐nameserver4:9876 #在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
    defaultTopicQueueNums=4 
    #是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 
    autoCreateTopicEnable=true 
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 
    autoCreateSubscriptionGroup=true 
    #Broker 对外服务的监听端口 
    listenPort=11011 
    #删除文件时间点,默认凌晨 4点 
    deleteWhen=04 
    #文件保留时间,默认 48 小时 
    fileReservedTime=120 
    #commitLog每个文件的大小默认1G 
    mapedFileSizeCommitLog=1073741824 
    #ConsumeQueue每个文件默认存30W条,根据业务情况调整 
    mapedFileSizeConsumeQueue=300000 
    #destroyMapedFileIntervalForcibly=120000 
    #redeleteHangedFileInterval=120000 
    #检测物理文件磁盘空间 
    diskMaxUsedSpaceRatio=88 
    #存储路径 
    storePathRootDir=/usr/local/rocketmq/store
    #commitLog 存储路径 
    storePathCommitLog=/usr/local/rocketmq/store/commitlog 
    #消费队列存储路径存储路径 
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue 
    #消息索引存储路径 
    storePathIndex=/usr/local/rocketmq/store/index 
    #checkpoint 文件存储路径 
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint 
    #abort 文件存储路径 
    abortFile=/usr/local/rocketmq/store/abort
    #限制的消息大小 
    maxMessageSize=65536 
    #flushCommitLogLeastPages=4 
    #flushConsumeQueueLeastPages=2 
    #flushCommitLogThoroughInterval=10000 
    #flushConsumeQueueThoroughInterval=60000 
    #Broker 的角色 
    #‐ ASYNC_MASTER 异步复制Master 
    #‐ SYNC_MASTER 同步双写Master 
    #‐ SLAVE 
    brokerRole=SLAVE 
    #刷盘方式 
    #‐ ASYNC_FLUSH 异步刷盘 
    #‐ SYNC_FLUSH 同步刷盘 
    flushDiskType=ASYNC_FLUSH 
    #checkTransactionMessageEnable=false 
    #发消息线程池数量 
    #sendMessageThreadPoolNums=128 
    #拉消息线程池数量 
    #pullMessageThreadPoolNums=128
    
    3.3.10 服务启动

    1)启动NameServe集群
    分别在四台服务器启动NameServer

    cd /usr/local/rocketmq/bin 
    nohup sh mqnamesrv &
    

    2)启动Broker集群

    • 在192.168.217.130上启动master1
    cd /usr/local/rocketmq/bin 
    nohup sh mqbroker ‐c /usr/local/rocketmq/conf/2m-2s-sync/broker-a.properties > /dev/null 2>&1 &
    
    • 在192.168.217.132上启动master2
    cd /usr/local/rocketmq/bin 
    nohup sh mqbroker ‐c /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties > /dev/null 2>&1 &
    
    • 在192.168.217.133启动slave1
    cd /usr/local/rocketmq/bin 
    nohup sh mqbroker ‐c /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties > /dev/null 2>&1 &
    
    • 在192.168.217.134上启动slave2
    cd /usr/local/rocketmq/bin 
    nohup sh mqbroker ‐c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties > /dev/null 2>&1 &
    
    3.3.11 查看进程状态
    [root@localhost bin]# jps
    3107 NamesrvStartup
    3131 BrokerStartup
    3231 Jps
    
    3.3.12 查看日志
    # 查看nameServer日志 
    tail ‐500f rocketmq/logs/rocketmqlogs/namesrv.log 
    # 查看broker日志 
    tail ‐500f rocketmq/logs/rocketmqlogs/broker.log
    
    3.4 mqadmin管理工具

    进入RocketMQ安装位置,在bin目录下执行
    ./mqadmin {command} {args}
    详细命令可参考https://www.iteye.com/blog/jameswxx-2091971
    注意事项

    • 几乎所有命令都需要配置-n表示NameServer地址,格式为ip:port
    • 几乎所有命令都可以通过-h获取帮助
    • 如果既有Broker地址(-b)配置项又有clusterName(-c)配置项,则优先以Broker 地址执行命令;如果不配置Broker地址,则对集群中所有主机执行命令
    3.5 集群监控平台搭建

    除了通过命令行的方式可以对集群进行管理,我们还可以使用控制台对集群进行管理。我们在部署单节点集群时已经说明了它的使用方法,只需要修改项目的namesrvAddr即可。

    rocketmq.config.namesrvAddr=192.168.217.130:9876;192.168.217.132:9876;192.168.217.133:9876;192.168.217.134:9876
    

    相关文章

      网友评论

        本文标题:RocketMQ(一) 初识RocketMQ

        本文链接:https://www.haomeiwen.com/subject/yxsjuctx.html