- Spring Cloud Stream 进阶配置——动态路由
- Spring Cloud Stream 进阶配置——高可用(二)
- Nacos+Spring Cloud Gateway动态路由配置
- Spring Cloud -Config动态配置是如何实现的?(
- 30. Spring Cloud Alibaba之消息中间件 -
- Spring Cloud Alibaba之消息中间件 - Spr
- 08基于Stream的消息中间件绑定
- Spring Cloud Bus + kafka实现配置中心配置
- 3.《kafka》SpringCloud学习之SpringClo
- RabbitMQ学习(八)与SpringCloudStream整
ps: 本文所有代码可在 这里 查看。
背景
记得之前有一个场景,网关接收各种类型设备上传过来的数据包,然后根据不同类型的数据,通过 MQ
转发到相应的处理器进行消费。举个例子:现在有2种类型的设备采集器,分别为 水位监测器 和 温度监测器,最后会分发到各自的处理器进行处理。
解决方案
一般做法
所有处理不同类型数据的队列,监听同一个 Topic
,然后消费时,通过判断数据的类型来决定是否需要处理。比如上面的例子,每来一条数据,2个处理器都会去消费这条数据,对于 水位监测器 的处理器,如果是 水位监测器 的数据,那刚好,正常消费,如果是 温度监测器 的数据,直接跳过。
这种做法,优点很明显,即不用增加其他配置,只需在消费时做下类型判断;但缺点也特别明显,所有消息,每一个队列都需要消费一次。为什么这么说呢?我们都知道,消息在投递过程中,消息是需要序列化和反序列化的(一般使用的是 json
),序列化和反序列化是需要耗系统资源的,而且投递过程中也是需要占用带宽的,而消息到达消费端时,大部分情况下都会因为类型不符而跳过处理,最后还要通知交换机处理结果,这样就会造成不必要的资源浪费。
可以看到,如果数据量小,分类不多,缺点并不会造成多严重的后果,但如果数据量一大,分类一多,那将会极大的浪费系统资源。我们都知道,物联网的各种设备何止千千万,不同设备类型更是繁多,那么一旦使用这种方案,本来10台机器能搞定的事情,最后可能需要几十台,数据分类多的话,可能还需要更多。而且数据量如果突然剧增,系统也很容易就扛不住。
综上,这种方案大多情况下是不适用的。那有没有更好的方案,比如不同处理器,只处理一种对应的设备上报的数据包?答案是肯定的,那就是——动态路由。
动态路由
何为 动态路由?简单的说,就是:消息到达交换机后,会根据动态的 routingKey
,投递到与交换机绑定时 bindingKey
相同的队列中。
举个例子,水位监测器 的队列与交换机绑定时使用的 bindingKey
为 waterLevel
,这时如果来了一条监测到的水位数据,消息在发布时使用的动态 routingKey
也为 waterLevel
,那这条数据 水位监测器 的处理器能正常处理,而 bindingKey
为 temperature
的 温度监测器 队列则收不到这条数据。
我们都知道, bindingKey
可以通过配置 spring.cloud.stream.rabbit.bindings.<channelName>.consumer.bindingRoutingKey
来达到效果,那难点就剩下:如何在发布消息时指定想要的 routingKey
。翻了下官方文档,找到这样一个配置:
很明显,这就是我们想要的,支持一个 SpEL
表达式,如果是固定的 routingKey
,写一个常量字符串即可。
文档链接在 这里。
接下来,我们来进行一个简单的 demo
试一下。
application-dynamic.yml
spring:
cloud:
stream:
bindings:
packetUplinkOutput:
destination: packetUplinkTopic
content-type: application/json
binder: rabbit
waterLevelInput:
destination: packetUplinkTopic
content-type: application/json
group: ${spring.application.name}.waterLevel
binder: rabbit
temperatureInput:
destination: packetUplinkTopic
content-type: application/json
group: ${spring.application.name}.temperature
binder: rabbit
rabbit:
bindings:
packetUplinkOutput:
producer:
# 生产者配置RabbitMq的动态路由键
routingKeyExpression: headers.type
waterLevelInput:
consumer:
bindingRoutingKey: waterLevel # 将queue绑定到exchange时使用的routing key。默认'#'
temperatureInput:
consumer:
bindingRoutingKey: temperature # 将queue绑定到exchange时使用的routing key。默认'#'
ScasDynamicRoutingTest
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("dynamic")
@EnableBinding({ScasDynamicRoutingTest.MessageSink.class, ScasDynamicRoutingTest.MessageSource.class})
public class ScasDynamicRoutingTest {
@Autowired
private PacketUplinkProducer packetUplinkProducer;
private Random random = new Random();
private List<String> devEuis = new ArrayList<>(10);
@PostConstruct
private void initDevEuis() {
devEuis.add("10001");
devEuis.add("10002");
devEuis.add("10003");
devEuis.add("10004");
devEuis.add("10005");
devEuis.add("10006");
devEuis.add("10007");
devEuis.add("10008");
devEuis.add("10009");
devEuis.add("10010");
}
@Test
public void test() throws InterruptedException {
for (int i = 0; i < 5; i++) {
String devEui = getDevEuis();
String type = "waterLevel";
packetUplinkProducer.publish(new PacketModel(devEui, type));
}
for (int i = 0; i < 5; i++) {
String devEui = getDevEuis();
String type = "temperature";
packetUplinkProducer.publish(new PacketModel(devEui, type));
}
Thread.sleep(10000000);
}
private String getDevEuis() {
return devEuis.get(random.nextInt(10));
}
@Component
public static class PacketUplinkProducer {
@Autowired
private MessageSource messageSource;
public void publish(PacketModel model) {
log.info("发布上行数据包消息. model: [{}].", model);
Message<PacketModel> message = MessageBuilder.withPayload(model).setHeader("type", model.getType()).build();
messageSource.packetUplinkOutput().send(message);
}
}
@Component
public static class PacketUplinkHandler {
@StreamListener("waterLevelInput")
public void handleWaterLevelPacket(PacketModel model) throws InterruptedException {
log.info("消费【水位监测器】数据包消息. model: [{}].", model);
}
@StreamListener("temperatureInput")
public void handleTemperaturePacket(PacketModel model) throws InterruptedException {
log.info("消费【温度监测器】数据包消息. model: [{}].", model);
}
}
public interface MessageSink {
@Input("waterLevelInput")
SubscribableChannel waterLevelInput();
@Input("temperatureInput")
SubscribableChannel temperatureInput();
}
public interface MessageSource {
@Output("packetUplinkOutput")
MessageChannel packetUplinkOutput();
}
@Data
@NoArgsConstructor
@AllArgsConstructor
static class PacketModel {
/**
* 设备 eui
*/
private String devEui;
/**
* 设备类型
*/
private String type;
}
}
测试结果
result再看一下可视化界面的队列详情:
可以看到,
routing key
为我们配置的 waterLevel
。
如果将其中某一个的 bindingRoutingKey
去掉或改成默认值 #
,结果如下:
代码分析
其实最关键的一行代码是:
// ...
Message<PacketModel> message = MessageBuilder.withPayload(model).setHeader("type", model.getType()).build();
// ...
构建消息时,自定义一个 key
为 type
的 header
,而我们在定义生产者时,指定了 routingKeyExpression
为 headers.type
,也就是说,在投递时会以 type
的值作为最后的 routingKey
。所以,这样也就达到了我们想要的效果。
总结
这种配置方式,适合:生产者只有一个,消费者有多个,且需要将不同的消息投递到不同的目标队列。这样的场景很多,除了上面举的例子,还有:不同平台(天猫、淘宝、京东、有赞等)的订单,需要被各自的处理器进行消费。
相关链接
推荐阅读
Spring Cloud 进阶玩法
统一异常处理介绍及实战
Spring Cloud Stream 进阶配置——使用延迟队列实现“定时关闭超时未支付订单”
Spring Cloud Stream 进阶配置——高可用(二)——死信队列
网友评论