美文网首页我的微服务
RocketMQ 与 Spring Cloud Stream整合

RocketMQ 与 Spring Cloud Stream整合

作者: 梅西爱骑车 | 来源:发表于2020-08-24 07:45 被阅读0次

    在上述的示例中,我们看到的都是使用集群消费,也是最常用的消费模式。而在一些场景下,我们需要使用广播消费

    广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。

    例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 RocketMQ 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。

    又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 RocketMQ 广播消费,每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。

    下面,我们来搭建一个 Spring Cloud Stream 消费异常处理机制的示例。考虑方便,我们直接复用[快速入门]文章的项目,使用 [sca-stream-rocketmq-producer]发送消息,从 [sca-stream-rocketmq-consumer]复制出 [sca-stream-rocketmq-consumer-broadcasting] 来演示广播消费

    5.1 复制项目

    使用 [sca-stream-rocketmq-producer]发送消息,从 [sca-stream-rocketmq-consumer]复制出 [sca-stream-rocketmq-consumer-broadcasting]。

    5.2 配置文件

    修改 [application.yml]配置文件,只改了一个参数,设置 broadcasting 配置项为 true,开启广播消费的模式。完整配置如下:

    spring:
      application:
        name: erbadagang-consumer-application
      cloud:
        # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
        stream:
          # Binding 配置项,对应 BindingProperties Map
          bindings:
            erbadagang-input:
              destination: ERBADAGANG-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
              content-type: application/json # 内容格式。这里使用 JSON
              group: erbadagang-consumer-group-ERBADAGANG-TOPIC-01 # 消费者分组,命名规则:组名+topic名
    
            trek-input:
              destination: TREK-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
              content-type: application/json # 内容格式。这里使用 JSON
              group: trek-consumer-group-TREK-TOPIC-01 # 消费者分组,命名规则:组名+topic名
          # Spring Cloud Stream RocketMQ 配置项
          rocketmq:
            # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
            binder:
              name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
            # RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
            bindings:
              erbadagang-input:
                # RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
                consumer:
                  enabled: true # 是否开启消费,默认为 true
                  broadcasting: true # 是否使用广播消费,默认为 false(使用集群消费)
    
    server:
      port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者
    

    5.3 简单测试

    ① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 erbadagang-consumer-group-ERBADAGANG-TOPIC-01 下有两个消费者实例。

    ② 执行 ProducerApplication,启动生产者的实例。

    启动1个producer,2个consumer

    之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:

    // ConsumerApplication 控制台 01
    2020-08-06 17:07:35.633  INFO 8444 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][线程编号:94 消息内容:Demo01Message{id=1167829440}]
    
    // ConsumerApplication 控制台 02
    2020-08-06 17:07:35.633  INFO 15132 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer      : [onMessage][线程编号:93 消息内容:Demo01Message{id=1167829440}]
    

    从日志可以看出,每条消息仅被每个消费者消费了一次。

    底线


    本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。

    相关文章

      网友评论

        本文标题:RocketMQ 与 Spring Cloud Stream整合

        本文链接:https://www.haomeiwen.com/subject/umhtdktx.html