美文网首页MQ
RocketMQ 与 Spring Boot整合(五、广播消费)

RocketMQ 与 Spring Boot整合(五、广播消费)

作者: 梅西爱骑车 | 来源:发表于2020-08-18 07:26 被阅读0次

在上述的示例中,我们看到的都是使用集群消费。而在一些场景下,我们需要使用广播消费

广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 RocketMQ 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。

又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 RocketMQ 广播消费,每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。

下面,我们开始本小节的示例。

5.1 Demo05Message

package com.ebadagang.springboot.rocketmq.message;

/**
 * 示例 05 的 Message 消息
 */
public class Demo05Message {

    public static final String TOPIC = "DEMO_05";

    /**
     * 编号
     */
    private Integer id;

    public Demo05Message setId(Integer id) {
        this.id = id;
        return this;
    }

    public Integer getId() {
        return id;
    }

    @Override
    public String toString() {
        return "Demo05Message{" +
                "id=" + id +
                '}';
    }

}

5.2 Demo05Producer

创建 [Demo04Producer]类,它会使用 RocketMQ-Spring 封装提供的 RocketMQTemplate ,实现同步发送消息。代码如下:

package com.ebadagang.springboot.rocketmq.producer;

import com.ebadagang.springboot.rocketmq.message.Demo05Message;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Demo05Producer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    public SendResult syncSend(Integer id) {
        // 创建 Demo05Message 消息
        Demo05Message message = new Demo05Message();
        message.setId(id);
        // 同步发送消息
        return rocketMQTemplate.syncSend(Demo05Message.TOPIC, message);
    }

}

5.3 Demo05Consumer

创建 [Demo05Consumer]类,实现 Rocket-Spring 定义的 RocketMQListener 接口,消费消息。代码如下:

package com.ebadagang.springboot.rocketmq.consumer;

import com.ebadagang.springboot.rocketmq.message.Demo05Message;
import org.apache.rocketmq.spring.annotation.MessageModel;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
@RocketMQMessageListener(
        topic = Demo05Message.TOPIC,
        consumerGroup = "demo05-consumer-group-" + Demo05Message.TOPIC,
        messageModel = MessageModel.BROADCASTING // 设置为广播消费
)
public class Demo05Consumer implements RocketMQListener<Demo05Message> {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Override
    public void onMessage(Demo05Message message) {
        logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
    }

}
  • 主要是 @RocketMQMessageListener 注解,通过设置了 messageModel = MessageModel.BROADCASTING ,表示使用广播消费

5.4 简单测试

创建 [Demo05ProducerTest]测试类,用于测试广播消费。代码如下:

package com.ebadagang.springboot.rocketmq.producer;

import com.ebadagang.springboot.rocketmq.Application;
import org.apache.rocketmq.client.producer.SendResult;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo05ProducerTest {

    private Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired
    private Demo05Producer producer;

    @Test
    public void test() throws InterruptedException {
        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }

    @Test
    public void testSyncSend() throws InterruptedException {
        int id = (int) (System.currentTimeMillis() / 1000);
        SendResult result = producer.syncSend(id);
        logger.info("[testSyncSend][发送编号:[{}] 发送结果:[{}]]", id, result);

        // 阻塞等待,保证消费
        new CountDownLatch(1).await();
    }

}

5.4.1 首先

执行#test()测试方法,先启动一个消费者分组 "demo05-consumer-group-DEMO_05" 的 Consumer 节点。

5.4.2 然后

执行#testSyncSend()测试方法,先启动一个消费者分组 "demo05-consumer-group-DEMO_05"Consumer节点。同时,该测试方法,调用 Demo05ProducerTest#syncSend(id)方法,同步发送了一条消息。控制台输出如下:

5.4.3 #testSyncSend() 方法对应的控制台

# Producer 同步发送消息成功
2020-08-04 21:56:34.739  INFO 10824 --- [           main] c.e.s.r.producer.Demo05ProducerTest      : [testSyncSend][发送编号:[1596549394] 发送结果:[SendResult [sendStatus=SEND_OK, msgId=240884E30114A66731EF8A0EAAFD768A2A4818B4AAC2142870A50000, offsetMsgId=6585E30D00002A9F0000000000039CC7, messageQueue=MessageQueue [topic=DEMO_05, brokerName=broker-a, queueId=0], queueOffset=0]]]
# Demo05Consumer 消费了该消息
2020-08-04 21:56:34.771  INFO 10824 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo05Consumer   : [onMessage][线程编号:174 消息内容:Demo05Message{id=1596549394}]

5.4.4 #test() 方法对应的控制台

# 另外一个 Demo05Consumer 也消费了该消息
2020-08-04 21:56:34.755  INFO 15504 --- [MessageThread_1] c.e.s.rocketmq.consumer.Demo05Consumer   : [onMessage][线程编号:184 消息内容:Demo05Message{id=1596549394}]

消费者分组 "demo05-consumer-group-DEMO_05" 的两个 Consumer 节点,都消费了这条发送的消息。符合广播消费的预期。

底线


本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。

相关文章

网友评论

    本文标题:RocketMQ 与 Spring Boot整合(五、广播消费)

    本文链接:https://www.haomeiwen.com/subject/tjdgrktx.html