- Spring Cloud Stream Kafka
- 3.《kafka》SpringCloud学习之SpringClo
- RocketMQ 与 Spring Cloud Stream整合
- RocketMQ 与 Spring Cloud Stream整合
- RocketMQ 与 Spring Cloud Stream整合
- RocketMQ 与 Spring Cloud Stream整合
- RocketMQ 与 Spring Cloud Stream整合
- RocketMQ 与 Spring Cloud Stream整合
- RocketMQ 与 Spring Cloud Stream整合
- RocketMQ 和Spring Cloud Stream
在上述的示例中,我们看到的都是使用集群消费,也是最常用的消费模式。而在一些场景下,我们需要使用广播消费。
广播消费模式下,相同 Consumer Group 的每个 Consumer 实例都接收全量的消息。
例如说,在应用中,缓存了数据字典等配置表在内存中,可以通过 RocketMQ 广播消费,实现每个应用节点都消费消息,刷新本地内存的缓存。
又例如说,我们基于 WebSocket 实现了 IM 聊天,在我们给用户主动发送消息时,因为我们不知道用户连接的是哪个提供 WebSocket 的应用,所以可以通过 RocketMQ 广播消费,每个应用判断当前用户是否是和自己提供的 WebSocket 服务连接,如果是,则推送消息给用户。
下面,我们来搭建一个 Spring Cloud Stream 消费异常处理机制的示例。考虑方便,我们直接复用[快速入门]文章的项目,使用 [sca-stream-rocketmq-producer
]发送消息,从 [sca-stream-rocketmq-consumer
]复制出 [sca-stream-rocketmq-consumer-broadcasting
] 来演示广播消费。
5.1 复制项目
使用 [sca-stream-rocketmq-producer
]发送消息,从 [sca-stream-rocketmq-consumer
]复制出 [sca-stream-rocketmq-consumer-broadcasting
]。
5.2 配置文件
修改 [application.yml
]配置文件,只改了一个参数,设置 broadcasting
配置项为 true
,开启广播消费的模式。完整配置如下:
spring:
application:
name: erbadagang-consumer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
erbadagang-input:
destination: ERBADAGANG-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
group: erbadagang-consumer-group-ERBADAGANG-TOPIC-01 # 消费者分组,命名规则:组名+topic名
trek-input:
destination: TREK-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
group: trek-consumer-group-TREK-TOPIC-01 # 消费者分组,命名规则:组名+topic名
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 101.133.227.13:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
erbadagang-input:
# RocketMQ Consumer 配置项,对应 RocketMQConsumerProperties 类
consumer:
enabled: true # 是否开启消费,默认为 true
broadcasting: true # 是否使用广播消费,默认为 false(使用集群消费)
server:
port: ${random.int[10000,19999]} # 随机端口,方便启动多个消费者
5.3 简单测试
① 执行 ConsumerApplication 两次,启动两个消费者的实例,从而实现在消费者分组 erbadagang-consumer-group-ERBADAGANG-TOPIC-01
下有两个消费者实例。
② 执行 ProducerApplication,启动生产者的实例。
之后,请求 http://127.0.0.1:18080/demo01/send 接口三次,发送三条消息。此时在 IDEA 控制台看到消费者打印日志如下:
// ConsumerApplication 控制台 01
2020-08-06 17:07:35.633 INFO 8444 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][线程编号:94 消息内容:Demo01Message{id=1167829440}]
// ConsumerApplication 控制台 02
2020-08-06 17:07:35.633 INFO 15132 --- [MessageThread_1] c.e.s.s.r.c.listener.Demo01Consumer : [onMessage][线程编号:93 消息内容:Demo01Message{id=1167829440}]
从日志可以看出,每条消息仅被每个消费者消费了一次。
底线
本文源代码使用 Apache License 2.0开源许可协议,这里是本文源码Gitee地址,可通过命令git clone+地址
下载代码到本地,也可直接点击链接通过浏览器方式查看源代码。
网友评论