一、RocketMQ集群搭建
- 集群规划
名称 | broker角色 | IP&port |
---|---|---|
nameSer-1 | 注册服务中心 | UAT容器集群-02:9876 |
nameSer-2 | 注册服务中心 | UAT容器集群-04:9876 |
broker-a-0 | master | UAT容器集群-01:10911 broker-a.properties |
broker-a-1 | slave | UAT容器集群-04:10911 broker-a-s.properties |
broker-b-0 | master | UAT容器集群-03:10911 broker-b.properties |
broker-b-1 | slave | UAT容器集群-02:10911 broker-b-s.properties |
-
下载软件包,并解压到相应目录。
下载地址:https://github.com/apache/rocketmq/archive/rocketmq-all-4.6.0.tar.gz -
修改启动脚本配置
默认broker的JVM内存为8G,nameSer的JVM内存为4G。对于测试环境有点奢侈。所以,将nameSer改为1G,broker改为2G。
[admin@iZbp150aqw850jwlm06qhcZ bin]$ cat runbroker.sh | grep Xms
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g"
[admin@iZbp150aqw850jwlm06qhcZ bin]$ cat runserver.sh | grep Xms
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
- 修改broker配置
将conf目录下 broker-a.properties、broker-a-s.properties、broker-b.properties、broker-b-s.properties 四个文件分别添加如下配置。
[admin@iZbp150aqw850jwlm06qhcZ 2m-2s-async]$ cat broker-a.properties | grep namesrvAddr
namesrvAddr=UAT容器集群-02:9876;UAT容器集群-04:9876
-
将配置复制到其他3台机器
-
启动nameSrv和broker服务
启动nameSrv
# UAT容器集群-02
[admin@iZbp15v1c0o8malwswsdm5Z bin]$ nohup sh mqnamesrv 2>&1 &
# UAT容器集群-04
[admin@iZbp13d7te8727w5phtiskZ bin]$ nohup sh mqnamesrv 2>&1 &
启动broker
# UAT容器集群-01
[admin@iZbp150aqw850jwlm06qhcZ bin]nohup sh mqbroker -c /mnt/rocketmq-all-4.6.0-bin-release/conf/2m-2s-async/broker-a.properties 2>&1 &
# UAT容器集群-03
[admin@iZbp13d7te8727w5phtisjZ bin]$ nohup sh mqbroker -c /mnt/rocketmq-all-4.6.0-bin-release/conf/2m-2s-async/broker-b.properties 2>&1 &
# UAT容器集群-02
[admin@iZbp13d7te8727w5phtiskZ bin]$ nohup sh mqbroker -c /mnt/rocketmq-all-4.6.0-bin-release/conf/2m-2s-async/broker-a-s.properties 2>&1 &
# UAT容器集群-04
[admin@iZbp13d7te8727w5phtiskZ bin]$ nohup sh mqbroker -c /mnt/rocketmq-all-4.6.0-bin-release/conf/2m-2s-async/broker-b-s.properties 2>&1 &
- 可以查看日志检查集群是否运行成功。
tail -f ~/logs/rocketmqlogs/namesrv.log
tail -f ~/logs/rocketmqlogs/broker.log
- 至此,rocketmq集群搭建完成。
二、如何保证消息不丢失
-
保证Producer成功投递。
1.1. 默认情况下,可以通过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息一定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功。
1.2. 采取事务消息的投递方式,并不能保证消息100%投递成功到了Broker,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的。可以在日志中查看到这条异常的消息,严格意义上来讲,也并没有完全丢失(事务消息成本非常高)
1.3. RocketMQ通过queryMessage,来查询消息是否在Broker存储成功 -
保证Broker数据不丢失。
2.1. 消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的
2.2. Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中(同步刷盘的成本非常高)
2.3. Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失(如果是用同步策略,无法自动主从切换) -
保证Consumer消息不丢失。
3.1. Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已经成功消费或者已经成功发回到broker的消息下标
3.2. 如果Consumer消费失败,通过ConsumeConcurrentlyStatus.RECONSUME_LATER进行重试
3.3. 如果Consumer消费失败,发回给broker时,broker挂掉了,那么Consumer会定时重试这个操作
3.4. 如果Consumer和Broker一起挂了,消息也不会丢失,因为Consumer 里面的offset是定时持久化的,重启之后,继续拉取offset之前的消息到本地
Mark一下别人的经验:
RocketMQ-架构原理
RocketMQ 教程
RocketMQ性能优化【实战笔记】
为什么选择RocketMQ
网友评论