美文网首页我的微服务
RocketMQ 与 Spring Cloud Stream整合

RocketMQ 与 Spring Cloud Stream整合

作者: 梅西爱骑车 | 来源:发表于2020-08-24 07:45 被阅读0次

在上述的示例中,我们看到的都是使用集群消费,也是最常用的消费模式。而在一些场景下,我们需要使用广播消费

广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 RocketMQ 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。

又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 RocketMQ 广播消费,每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。

下面,我们来搭建一个 Spring Cloud Stream 消费异常处理机制的示例。考虑方便,我们直接复用[快速入门]文章的项目,使用 [sca-stream-rocketmq-producer]发送消息,从 [sca-stream-rocketmq-consumer]复制出 [sca-stream-rocketmq-consumer-broadcasting] 来演示广播消费

5.1 复制项目

使用 [sca-stream-rocketmq-producer]发送消息,从 [sca-stream-rocketmq-consumer]复制出 [sca-stream-rocketmq-consumer-broadcasting]。

5.2 配置文件

修改 [application.yml]配置文件,只改了一个参数,设置 broadcasting 配置项为 true,开启广播消费的模式。完整配置如下:

spring:
  application:
    name: erbadagang-consumer-application
  cloud:
    # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
    stream:
      # Binding 配置项,对应 BindingProperties Map
      bindings:
        erbadagang-input:
          destination: ERBADAGANG-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON
          group: erbadagang-consumer-group-ERBADAGANG-TOPIC-01 # 消费者分组,命名规则:组名+topic名

        trek-input:
          destination: TREK-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
          content-type: application/json # 内容格式。这里使用 JSON
          group: trek-consumer-group-TREK-TOPIC-01 # 消费者分组,命名规则:组名+topic名
      # Spring Cloud Stream RocketMQ 配置项
      rocketmq:
        # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
        binder:
          name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
        # RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
        bindings:
          erbadagang-input:
            # RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
            consumer:
              enabled: true # 是否开启消费,默认为 true
              broadcasting: true # 是否使用广播消费,默认为 false(使用集群消费)

server:
  port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者

5.3 简单测试

① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 erbadagang-consumer-group-ERBADAGANG-TOPIC-01 下有两个消费者实例。

② 执行 ProducerApplication,启动生产者的实例。

启动1个producer,2个consumer

之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:

// ConsumerApplication 控制台 01
2020-08-06 17:07:35.633  INFO 8444 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][线程编号:94 消息内容:Demo01Message{id=1167829440}]

// ConsumerApplication 控制台 02
2020-08-06 17:07:35.633  INFO 15132 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][线程编号:93 消息内容:Demo01Message{id=1167829440}]

从日志可以看出,每条消息仅被每个消费者消费了一次。

底线


本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。

相关文章

网友评论

    本文标题:RocketMQ 与 Spring Cloud Stream整合

    本文链接:https://www.haomeiwen.com/subject/umhtdktx.html