美文网首页
RocketMQ-负载均衡和广播模式

RocketMQ-负载均衡和广播模式

作者: 别拿爱情当饭吃 | 来源:发表于2021-05-31 22:40 被阅读0次

    RocketMQ的消费者消费消息,有2种模式:

    • 负载均衡模式


      image
    • 广播模式


      image

    首先我们先启动一个生产者:发送了10条消息,主题是TopicTest,tag是TagA

    public class SyncProducer {
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
            DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
            //设置生产者组
            defaultMQProducer.setProducerGroup("syncProducer");
    
            //设置nameserver
            defaultMQProducer.setNamesrvAddr("localhost:9876");
    
            //启动生产者
            defaultMQProducer.start();
    
            for (int i = 0; i < 10; i++) {
                //构建消息 topic tag 内容
                Message msg = new Message("TopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " +
                                i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //同步发送,且返回结果
                SendResult sendResult = defaultMQProducer.send(msg);
                System.out.println("发送结果"+sendResult);
            }
            //关闭生产者
            defaultMQProducer.shutdown();
        }
    }
    //运行结果
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2040000, offsetMsgId=C0A81FF100002A9F0000000000045402, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=350]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2230001, offsetMsgId=C0A81FF100002A9F00000000000454CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=350]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2260002, offsetMsgId=C0A81FF100002A9F0000000000045594, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=350]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22A0003, offsetMsgId=C0A81FF100002A9F000000000004565D, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=350]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22C0004, offsetMsgId=C0A81FF100002A9F0000000000045726, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=351]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22E0005, offsetMsgId=C0A81FF100002A9F00000000000457EF, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=351]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2300006, offsetMsgId=C0A81FF100002A9F00000000000458B8, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=351]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2320007, offsetMsgId=C0A81FF100002A9F0000000000045981, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=351]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2340008, offsetMsgId=C0A81FF100002A9F0000000000045A4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=352]
    发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2360009, offsetMsgId=C0A81FF100002A9F0000000000045B13, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=352]
    14:29:56.930 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
    14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
    14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true
    

    接下来,我们启动2个消费者,分别用负载均衡模式和广播模式去进行消费信息:

    负载均衡(或叫集群模式)

    public class ClusterConsumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer");
    
            consumer.setNamesrvAddr("localhost:9876");
    
            //设置集群模式,也就是负载均衡模式
            consumer.setMessageModel(MessageModel.CLUSTERING);
            //订阅主题和标签
            consumer.subscribe("TopicTest","TagA");
    
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println("消费信息:"+new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
        }
    }
    //运行2个实例,结果如下:
    实例1:
    消费信息:Hello RocketMQ 3
    消费信息:Hello RocketMQ 2
    消费信息:Hello RocketMQ 7
    消费信息:Hello RocketMQ 6
    
    实例2:
    消费信息:Hello RocketMQ 1
    消费信息:Hello RocketMQ 0
    消费信息:Hello RocketMQ 4
    消费信息:Hello RocketMQ 5
    消费信息:Hello RocketMQ 8
    消费信息:Hello RocketMQ 9
    

    广播模式

    public class BoardConsumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("clusterConsumer");
    
            consumer.setNamesrvAddr("localhost:9876");
    
            //设置集群模式,也就是负载均衡模式
            consumer.setMessageModel(MessageModel.BROADCASTING);
            //订阅主题和标签
            consumer.subscribe("TopicTest","TagA");
    
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println("消费信息:"+new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
        }
    //运行2个实例,结果如下:
    实例1:
    消费信息:Hello RocketMQ 0
    消费信息:Hello RocketMQ 1
    消费信息:Hello RocketMQ 2
    消费信息:Hello RocketMQ 3
    消费信息:Hello RocketMQ 5
    消费信息:Hello RocketMQ 9
    消费信息:Hello RocketMQ 4
    消费信息:Hello RocketMQ 6
    消费信息:Hello RocketMQ 7
    消费信息:Hello RocketMQ 8
    
    
    实例2:
    消费信息:Hello RocketMQ 2
    消费信息:Hello RocketMQ 3
    消费信息:Hello RocketMQ 1
    消费信息:Hello RocketMQ 0
    消费信息:Hello RocketMQ 5
    消费信息:Hello RocketMQ 9
    消费信息:Hello RocketMQ 4
    消费信息:Hello RocketMQ 8
    消费信息:Hello RocketMQ 7
    消费信息:Hello RocketMQ 6
    
    

    今天分享了消费者负载均衡模式和广播模式。
    在生产中,一般都是用负载均衡模式。广播模式比较少用。但还是得具体场景具体分析。

    后续文章

    • RocketMQ-入门(已更新)
    • RocketMQ-消息发送(已更新)
    • RocketMQ-消费信息
    • RocketMQ-消费者的广播模式和集群模式(已更新)
    • RocketMQ-顺序消息
    • RocketMQ-延迟消息
    • RocketMQ-批量消息
    • RocketMQ-过滤消息
    • RocketMQ-事务消息
    • RocketMQ-消息存储
    • RocketMQ-高可用
    • RocketMQ-高性能
    • RocketMQ-主从复制
    • RocketMQ-刷盘机制
    • RocketMQ-幂等性
    • RocketMQ-消息重试
    • RocketMQ-死信队列
      ...

    欢迎各位入(guan)股(zhu),后续文章干货多多。

    相关文章

      网友评论

          本文标题:RocketMQ-负载均衡和广播模式

          本文链接:https://www.haomeiwen.com/subject/chsisltx.html