美文网首页
RocketMQ主从集群模式搭建

RocketMQ主从集群模式搭建

作者: 端碗吹水 | 来源:发表于2020-11-30 20:36 被阅读0次
    • 主从模式环境可以保障消息的即时性与可靠性
    • 投递一条消息后,关闭主节点
    • 从节点继续可以提供消费者数据进行消费,但是不能接收消息
    • 主节点重新上线后会自动进行消费进度offset的同步

    准备两台机器,一主一从:

    机器IP hostname 角色
    192.168.243.169 rocketmq01 master
    192.168.243.170 rocketmq02 slave

    我这里事先在两台机器上安装好了RocketMQ,关于RocketMQ的安装可以参考如下文章:

    接下来,我们开始搭建RocketMQ主从集群。首先,配置两台机器的hosts

    $ vim /etc/hosts
    192.168.243.169 rocketmq-nameserver1 rocketmq-master1
    192.168.243.170 rocketmq-nameserver2 rocketmq-slave1
    

    修改master节点的配置文件:

    [root@rocketmq01 /usr/local/rocketmq-4.7.1]# echo "" > conf/2m-2s-async/broker-a.properties
    [root@rocketmq01 /usr/local/rocketmq-4.7.1]# vim conf/2m-2s-async/broker-a.properties
    #节点所属的集群名称
    brokerClusterName=rocketmq-cluster
    #broker 名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=0
    #nameServer 地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4 点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog 每个文件的大小默认 1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq-4.7.1/store
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq-4.7.1/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq-4.7.1/store/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq-4.7.1/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq-4.7.1/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq-4.7.1/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制 Master
    #- SYNC_MASTER 同步双写 Master
    #- SLAVE
    brokerRole=ASYNC_MASTER
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    

    修改slave节点的配置文件:

    [root@rocketmq01 /usr/local/rocketmq-4.7.1]# echo "" > conf/2m-2s-async/broker-a-s.properties
    [root@rocketmq01 /usr/local/rocketmq-4.7.1]# vim conf/2m-2s-async/broker-a-s.properties
    #节点所属的集群名称
    brokerClusterName=rocketmq-cluster
    #broker 名字,注意此处不同的配置文件填写的不一样
    brokerName=broker-a
    #0 表示 Master,>0 表示 Slave
    brokerId=1
    #nameServer 地址,分号分割
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    #在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数
    defaultTopicQueueNums=4
    #是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭
    autoCreateTopicEnable=true
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    #Broker 对外服务的监听端口
    listenPort=10911
    #删除文件时间点,默认凌晨 4 点
    deleteWhen=04
    #文件保留时间,默认 48 小时
    fileReservedTime=120
    #commitLog 每个文件的大小默认 1G
    mapedFileSizeCommitLog=1073741824
    #ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    #destroyMapedFileIntervalForcibly=120000
    #redeleteHangedFileInterval=120000
    #检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    #存储路径
    storePathRootDir=/usr/local/rocketmq-4.7.1/store
    #commitLog 存储路径
    storePathCommitLog=/usr/local/rocketmq-4.7.1/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/usr/local/rocketmq-4.7.1/store/consumequeue
    #消息索引存储路径
    storePathIndex=/usr/local/rocketmq-4.7.1/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/usr/local/rocketmq-4.7.1/store/checkpoint
    #abort 文件存储路径
    abortFile=/usr/local/rocketmq-4.7.1/store/abort
    #限制的消息大小
    maxMessageSize=65536
    #flushCommitLogLeastPages=4
    #flushConsumeQueueLeastPages=2
    #flushCommitLogThoroughInterval=10000
    #flushConsumeQueueThoroughInterval=60000
    #Broker 的角色
    #- ASYNC_MASTER 异步复制 Master
    #- SYNC_MASTER 同步双写 Master
    #- SLAVE
    brokerRole=SLAVE
    #刷盘方式
    #- ASYNC_FLUSH 异步刷盘
    #- SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    #checkTransactionMessageEnable=false
    #发消息线程池数量
    #sendMessageThreadPoolNums=128
    #拉消息线程池数量
    #pullMessageThreadPoolNums=128
    

    然后将这两个配置文件拷贝到Slave节点上:

    [root@rocketmq01 /usr/local/rocketmq-4.7.1]# scp conf/2m-2s-async/broker-a.properties rocketmq-slave1:/usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties
    [root@rocketmq01 /usr/local/rocketmq-4.7.1]# scp conf/2m-2s-async/broker-a-s.properties rocketmq-slave1:/usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a-s.properties
    

    完成配置后,就可以启动RocketMQ了,在master节点上执行如下命令:

    [root@rocketmq01 ~]# nohup sh mqnamesrv &
    [root@rocketmq01 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
    

    在slave节点上执行如下命令:

    [root@rocketmq02 ~]# nohup sh mqnamesrv &
    [root@rocketmq02 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &
    

    启动完成后,分别在两个节点上检查下服务的进程和端口是否正常:

    [root@rocketmq01 ~]# jps
    1942 Jps
    1739 NamesrvStartup
    1775 BrokerStartup
    [root@rocketmq01 ~]# netstat -lntp |grep java
    tcp6       0      0 :::10909                :::*              LISTEN      1775/java           
    tcp6       0      0 :::10911                :::*              LISTEN      1775/java           
    tcp6       0      0 :::10912                :::*              LISTEN      1775/java           
    tcp6       0      0 :::9876                 :::*              LISTEN      1739/java           
    [root@rocketmq01 ~]# 
    

    修改RocketMQ的管控台配置,并启动:

    [root@rocketmq01 ~]# cd /usr/local/src/rocketmq-externals/rocketmq-console/
    [root@rocketmq01 /usr/local/src/rocketmq-externals/rocketmq-console]# vim src/main/resources/application.properties 
    # 增加nameserver的地址
    rocketmq.config.namesrvAddr=192.168.243.169:9876;192.168.243.170:9876
    [root@rocketmq01 /usr/local/src/rocketmq-externals/rocketmq-console]# java -jar target/rocketmq-console-ng-2.0.0.jar
    

    此时在管控台中可以看到有两个节点了:


    image.png

    在Dashboard中也可以看到有两个Broker:


    image.png

    主从集群模式下的高可用机制故障演练

    创建一个普通的Maven项目,pom文件添加rocketmq-client依赖如下:

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.7.1</version>
    </dependency>
    

    生产者代码示例:

    package com.zj.rocketmq.learn.quickstart;
    
    import com.zj.rocketmq.learn.constant.Constants;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    
    import java.util.UUID;
    
    /**
     * rocketmq - 生产者
     *
     * @author 01
     * @date 2020-11-30
     **/
    public class Producer {
    
        public static void main(String[] args) throws Exception {
            // 在rocketmq中生产者必须在一个生产者组内
            String producerGroup = "quickstart_producer_group";
            DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
            // 设置nameserver的地址
            producer.setNamesrvAddr(Constants.NAME_SERVER_ADDRESSES);
            // 启动生产者
            producer.start();
    
            // 消息投递的目标主题
            String topic = "quickstart_topic";
            // 给消息打一个标签,标签的主要作用是用来过滤的
            String tag = "quickstart_tag";
            // 给消息设置一个key,是消息的唯一标识
            String key = UUID.randomUUID().toString();
            // 消息体,即具体的消息内容
            String body = "this is quickstart message!";
            Message message = new Message(topic, tag, key, body.getBytes());
            // 发送消息
            SendResult sendResult = producer.send(message);
            System.out.println("消息发送结果:" + sendResult);
            producer.shutdown();
        }
    }
    

    常量类代码:

    public class Constants {
    
        public static final String NAME_SERVER_ADDRESSES = "192.168.243.169:9876;192.168.243.170:9876";
    }
    

    运行生产者代码发送一条消息,控制台输出如下:

    消息发送结果:SendResult [sendStatus=SEND_OK, msgId=C0A8010B36502437C6DC998FAEE00000, offsetMsgId=C0A8F3A900002A9F0000000000033234, messageQueue=MessageQueue [topic=quickstart_topic, brokerName=broker-a, queueId=1], queueOffset=0]
    

    此时将主节点给停掉,模拟宕机:

    [root@rocketmq01 ~]# mqshutdown broker
    

    然后编写消费者端,代码如下:

    package com.zj.rocketmq.learn.quickstart;
    
    import com.zj.rocketmq.learn.constant.Constants;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    /**
     * rocketmq - 消费者
     *
     * @author 01
     * @date 2020-11-30
     **/
    public class Consumer {
    
        public static void main(String[] args) throws Exception {
            // 定义消费者组
            String consumerGroup = "quickstart_consumer_group";
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
            // 设置nameserver的地址
            consumer.setNamesrvAddr(Constants.NAME_SERVER_ADDRESS);
            // 设置从哪个位置开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    
            // 从哪个主题消费数据
            String topic = "quickstart_topic";
            // 用于匹配消息标签的表达式
            String subExpression = "*";
            // 订阅主题
            consumer.subscribe(topic, subExpression);
    
            // 注册消息监听器,在监听器中实现消息的处理逻辑
            consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
                System.out.println("------------- 接收到消息,开始进行业务处理 -------------");
                for (MessageExt msg : msgs) {
                    try {
                        System.out.printf("topic: %s, tags: %s, keys: %s, body: %s%n",
                                msg.getTopic(), msg.getTags(), msg.getKeys(), new String(msg.getBody()));
    
                        if ("0".equals(msg.getKeys())) {
                            throw new RuntimeException("模拟业务处理发生异常");
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        int reconsumeTimes = msg.getReconsumeTimes();
                        System.err.println("reconsumeTimes: " + reconsumeTimes);
                        if (reconsumeTimes == 3) {
                            // TODO 重试次数达到阈值,放弃重试,记录日志后续做补偿...
                            System.out.println("重试次数达到阈值,放弃重试!");
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
    
                        // 消息处理失败时返回,由于Broker的重试机制,会重新消费该消息
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
    
                // 消息处理成功时返回
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            });
    
            // 启动消费者
            consumer.start();
            System.out.println("consumer started...");
        }
    }
    

    运行消费者,正常情况下,该消费者依旧能够消费到数据:


    image.png

    重新启动master节点,让其重新加入集群:

    [root@rocketmq01 ~]# nohup sh mqbroker -c /usr/local/rocketmq-4.7.1/conf/2m-2s-async/broker-a.properties >/dev/null 2>&1 &
    

    在此过程注意查看消费者的控制台,正常情况下,master重新加入集群,消费者也不会重复消费,因为master会和slave同步offset进度。

    相关文章

      网友评论

          本文标题:RocketMQ主从集群模式搭建

          本文链接:https://www.haomeiwen.com/subject/rmudwktx.html