一、Stream介绍
SpringCloud Stream是在Spring Messaging和Spring Integration这两个项目的基础上发展而来的,我们在项目中可能使用多个MQ,或者在项目的某个阶段需要更换MQ,如果使用了Stream,我们只要更改配置即可,完全不用修改消息发送和接收的代码。而且Stream定义的消息收发模型使得我们代码更加简洁易懂。
详细的介绍文档可以参考:
Spring Cloud Stream 体系及原理介绍 (qq.com)
二、入门案例
首先我们从start.spring.io
下载一个全新的项目,需要的依赖有web、lombok,下载好了之后再自行引入stream-rocketmq的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2021.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
然后,我们开始创建生产者:
@Slf4j
@Component
@EnableBinding(Source.class)
public class MyMessageProducer {
@Autowired
private Source source;
public void send(String message){
log.info("生产者开始发送消息!");
source.output().send(MessageBuilder.withPayload(message).build());
}
}
我们使用Controller来出发生产者发送消息:
@RestController
public class ProducerController {
@Autowired
private MyMessageProducer producer;
@GetMapping("/sendMessage")
public String sendMessage(){
producer.send("hello spring cloud stream!");
return "OK";
}
}
随后,我们创建消费者:
@Slf4j
@Component
@EnableBinding(Sink.class)
public class MyMessageConsumer {
@StreamListener(Sink.INPUT)
public void receive(String message){
log.info("监听到消息内容为:{}", message);
}
}
最后,是我们最重要的配置文件内容:
server:
port: 8081
spring:
application:
name: stream-demo
cloud:
stream:
bindings:
output:
destination: myRocketMQTopic
group: test-stream
input:
destination: myRocketMQTopic
group: test-stream
rocketmq:
binder:
name-server: 127.0.0.1:9876
在启动MQ服务的前提下,启动我们的项目,就能成功运行一个Stream-Rocketmq的快速入门案例了。我们浏览器访问链接,使得生产者发送消息,随后消费者就能监听到这个消息。
2021-07-16 09:35:26.805 INFO 16920 --- [nio-8081-exec-1] c.e.s.producer.MyMessageProducer : 生产者开始发送消息!
2021-07-16 09:35:49.737 INFO 16920 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer : 监听到消息内容为:hello spring cloud stream!
2021-07-16 09:36:14.371 INFO 16920 --- [nio-8081-exec-3] c.e.s.producer.MyMessageProducer : 生产者开始发送消息!
2021-07-16 09:36:14.379 INFO 16920 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer : 监听到消息内容为:hello spring cloud stream!
三、自定义Channel
在如上的入门案例中,我们使用的是Stream自带的output、input通道,我们完全可以仿照它们自定义自己的Channel。
public interface MyOutputChannel {
@Output("myOutput")
MessageChannel myOutput();
}
public interface MyInputChannel {
@Input("myInput")
SubscribableChannel myInput();
}
然后我们新建一个生产者,使用自定义的output通道。
@Slf4j
@Component
@EnableBinding(MyOutputChannel.class)
public class MyMessageProducer2 {
@Autowired
private MyOutputChannel myOutputChannel;
public void send(String message){
log.info("MyMessageProducer2生产者开始发送消息!");
myOutputChannel.myOutput().send(MessageBuilder.withPayload(message).build());
}
}
新建一个消费者,监听自定义的input通道。
@Slf4j
@Component
@EnableBinding(MyInputChannel.class)
public class MyMessageConsumer2 {
@StreamListener("myInput")
public void receive(String message){
log.info("myInput监听到消息内容为:{}", message);
}
}
再新建一个controller来触发发送消息:
@RestController
public class ProducerController2 {
@Autowired
private MyMessageProducer2 producer;
@GetMapping("/sendMessage2")
public String sendMessage(){
producer.send("hello spring cloud stream using customize channel!");
return "OK";
}
}
最后,配置文件中需要增加我们自定义的output和input通道:
server:
port: 8081
spring:
application:
name: stream-demo
cloud:
stream:
bindings:
output:
destination: myRocketMQTopic
group: test-stream
myOutput:
destination: myRocketMQTopic2
group: test-stream2
input:
destination: myRocketMQTopic
group: test-stream
myInput:
destination: myRocketMQTopic2
group: test-stream2
rocketmq:
binder:
name-server: 127.0.0.1:9876
如此,我们两个例子中的channel可以同时工作了。
2021-07-16 10:16:46.495 INFO 18580 --- [nio-8081-exec-3] c.e.s.producer.MyMessageProducer2 : MyMessageProducer2生产者开始发送消息!
2021-07-16 10:16:52.220 INFO 18580 --- [nio-8081-exec-2] c.e.s.producer.MyMessageProducer2 : MyMessageProducer2生产者开始发送消息!
2021-07-16 10:16:52.808 INFO 18580 --- [nio-8081-exec-4] c.e.s.producer.MyMessageProducer2 : MyMessageProducer2生产者开始发送消息!
2021-07-16 10:16:55.610 INFO 18580 --- [nio-8081-exec-5] c.e.s.producer.MyMessageProducer : 生产者开始发送消息!
2021-07-16 10:16:55.729 INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer : 监听到消息内容为:hello spring cloud stream!
2021-07-16 10:16:56.306 INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer2 : myInput监听到消息内容为:hello spring cloud stream using customize channel!
2021-07-16 10:16:56.306 INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer2 : myInput监听到消息内容为:hello spring cloud stream using customize channel!
2021-07-16 10:16:58.249 INFO 18580 --- [nio-8081-exec-6] c.e.s.producer.MyMessageProducer : 生产者开始发送消息!
2021-07-16 10:16:58.256 INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer : 监听到消息内容为:hello spring cloud stream!
2021-07-16 10:16:59.310 INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer2 : myInput监听到消息内容为:hello spring cloud stream using customize channel!
四、消息过滤
有两种消息过滤的方法,第一种是设置Cosumer的消费Tag,指定的Tag才会被消费,未指定的Tag的消息则被过滤;另外一种是根据自定义header内容进行过滤。
为了区别以上的例子,我们重新创建一个Channel:
public interface NewChannel {
String INPUT = "newInput";
String OUTPUT = "newOutput";
@Output(NewChannel.INPUT)
MessageChannel output();
@Input(NewChannel.INPUT)
SubscribableChannel input();
}
然后分别是生产者和消费者:
@Slf4j
@Component
@EnableBinding(NewChannel.class)
public class MyMessageProducer3 {
@Autowired
private NewChannel newChannel;
public void send(Person person){
log.info("MyMessageProducer3生产者开始发送消息!");
Message<Person> message1 = MessageBuilder.withPayload(person)
// 指定ROcketMQ中消息的TAG
.setHeader(RocketMQHeaders.TAGS, "animal")
// 自定义header用于consumer监听器进行过滤
.setHeader("MyTag", "TAG1")
.build();
Message<Person> message2 = MessageBuilder.withPayload(person)
.setHeader(RocketMQHeaders.TAGS, "person")
.setHeader("MyTag", "TAG2")
.build();
Message<Person> message3 = MessageBuilder.withPayload(person)
.setHeader(RocketMQHeaders.TAGS, "person")
.setHeader("MyTag", "TAG3")
.build();
newChannel.output().send(message1);
newChannel.output().send(message2);
newChannel.output().send(message3);
}
}
@Slf4j
@Component
@EnableBinding(NewChannel.class)
public class MyMessageConsumer3 {
@StreamListener(value = NewChannel.INPUT, condition = "headers['MyTag']=='TAG2'")
public void receive(Message<Person> message){
log.info("MyMessageConsumer3监听到消息内容为:{}", message);
log.info("MyMessageConsumer3消息的payload:{}", message.getPayload());
log.info("MyMessageConsumer3消息的header:{}", message.getHeaders().get("MyTag"));
}
}
消息的实体类为:
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Person {
private String name;
private Integer age;
}
新建一个Controller用于发送消息:
@RestController
public class ProducerController3 {
@Autowired
private MyMessageProducer3 producer;
@GetMapping("/sendMessage3")
public String sendMessage(){
Person tom = new Person("tom", 21);
producer.send(tom);
return "OK";
}
}
配置文件修改为:
server:
port: 8081
spring:
application:
name: stream-demo
cloud:
stream:
bindings:
newInput:
destination: newRocketMQTopic
group: test-stream3
newOutput:
destination: newRocketMQTopic
group: test-stream3
rocketmq:
binder:
name-server: 127.0.0.1:9876
enable-msg-trace: true
bindings:
newInput:
consumer:
# 仅接受指定tags的消息
tags: person || plant
此时重启应用,我们访问controller的接口,就能同时发送三条消息出去。
在配置文件中,我们指定消费者只消费tags为person和plant的消息,对于tags为animal的消息则会显示CONSUMED_BUT_FILTERED
;
在消费者消费配置代码中,我们的注解@StreamListener指定了condition为只消费headers['MyTag']=='TAG2'
的消息;而TAG3的消息也被CONSUMED,只不过没有找到对应的消费者,所以控制台显示:
Cannot find a @StreamListener matching for message with id: 48260ac4-772b-54b8-2ae0-8b91635729a9
五、函数式编程
自从SpringCloud升级到3.1版本以上,如上示例中命令式编程注解@EnableBinding、@Output、@Input等都已经被标记为废弃了,废弃说明如下:
@deprecated as of 3.1 in favor of functional programming model
那么我们该如何使用函数式编程改造如上的示例呢。
5.1 定时驱动模型
我们需要重新创建一个工程,父工程stream-function,并且引入如下的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-stream-rocketmq -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2021.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
然后创建一个子工程consumer,新建一个Producer类:
@Slf4j
@Component
public class FunctionConsumer {
@Bean
public Consumer<Date> mySink(){
return (message) -> {
log.info("收到的消息为:{}", message);
};
}
}
增加配置文件:
server:
port: 8082
spring:
application:
name: stream-demo-consumer
cloud:
stream:
bindings:
# 格式:方法名-类型-序号
# 方法名表示消费消息的方法的名称
# 类型in表示消息接收,out表示消息发送
# 序号用来区分不同的消费者
mySink-in-0:
# topic
destination: functionRocketMQTopic
group: test-stream4
rocketmq:
binder:
name-server: 127.0.0.1:9876
enable-msg-trace: true
function:
# 消费者处理消息的方法名称
definition: mySink
至此,消费者就完成了,可以启动运行,就会开始监听functionRocketMQTopic这个主题的消息了。
然后我们还需要再创建一个子工程producer,新建一个Producer类:
@Slf4j
@Component
public class FunctionProducer {
@Bean
public Supplier<Date> mySource(){
return () -> {
log.info("当前发送消息开始!");
return new Date();
};
}
}
然后增加配置文件如下:
server:
port: 8081
spring:
application:
name: stream-demo-producer
cloud:
stream:
bindings:
# 格式:方法名-类型-序号
# 方法名表示消费消息的方法的名称
# 类型in表示消息接收,out表示消息发送
# 序号用来区分不同的消费者
mySource-out-0:
# topic
destination: functionRocketMQTopic
group: test-stream4
rocketmq:
binder:
name-server: 127.0.0.1:9876
enable-msg-trace: true
function:
# 生产者产生消息的方法名称
definition: mySource
如此,我们生产者也配置好了,启动就可以运行了。
5.2 streamBridge模型
该模型主要是为了满足由业务动作触发消息发送的场景。我们只需要新建一个Producer类:
@Slf4j
@Service
public class StreamBridgeProducer {
@Autowired
private StreamBridge streamBridge;
public void sendDateMessage(){
Message<Date> message = MessageBuilder.withPayload(new Date()).build();
log.info("开始发送消息!");
streamBridge.send("mySource-out-1", message);
}
}
然后新建一个Controller供调用触发消息的发送:
@RestController
public class SendController {
@Autowired
private StreamBridgeProducer streamBridgeProducer;
@GetMapping("/sendMessage")
public String sendMessage(){
streamBridgeProducer.sendDateMessage();
return "OK";
}
}
最后,配置文件改为如下:
server:
port: 8081
spring:
application:
name: stream-demo-producer
cloud:
stream:
bindings:
mySource-out-1:
destination: functionRocketMQTopic
group: test-stream4
rocketmq:
binder:
name-server: 127.0.0.1:9876
enable-msg-trace: true
如此,我们启动该生产者,在调用接口的时候,消息就能成功发送了。
5.3 多通道配置
我们可以在一个项目中同时配置多个生产者和多个消费者,甚至是既有生产者,也有消费者。
这里只给出配置文件了,其余内容可以参考上述已经给出的例子。
server:
port: 8081
spring:
application:
name: stream-demo-producer
cloud:
stream:
bindings:
# 多个生产者通道时,定时消息模式destination可以相同,streamBridge模式destination不能相同
mySource-out-0:
destination: functionRocketMQTopic0
mySource-out-1:
destination: functionRocketMQTopic1
mySource-out-2:
destination: functionRocketMQTopic2
mySource2-out-3:
destination: functionRocketMQTopic3
# 消费者通道
mySink-in-0:
destination: functionRocketMQTopic0
group: test-stream4
rocketmq:
binder:
name-server: 127.0.0.1:9876
enable-msg-trace: true
function:
definition: mySource;mySource2;mySink
六、消息分区
SpringCloudStream消息分区是在某些场景下确保具有相同特征的消息被同一个消费者消费,它能帮助某些不支持消息分区的消息队列实现消息分区的功能。
生产者的配置文件改为:
server:
port: 8084
spring:
application:
name: stream-demo-producer
cloud:
stream:
bindings:
mySource-out-0:
destination: functionRocketMQTopic
producer:
# 指定分区键的表达式规则
partition-key-expresion: payload
# 指定分区数量
partition-count: 2
rocketmq:
binder:
name-server: 127.0.0.1:9876
enable-msg-trace: true
function:
definition: mySource
多个消费者的配置文件改为:
server:
port: 8082
spring:
application:
name: stream-demo-consumer2
cloud:
stream:
# 指定当前消费者分区数量
instance-count: 2
# 指定当前消费者的索引
instance-index: 0
bindings:
mySink-in-0:
destination: functionRocketMQTopic
group: test-stream4
consumer:
# 开启分区
partitioned: true
rocketmq:
binder:
name-server: 127.0.0.1:9876
enable-msg-trace: true
function:
definition: mySink
server:
port: 8083
spring:
application:
name: stream-demo-consumer2
cloud:
stream:
# 指定当前消费者分区数量
instance-count: 2
# 指定当前消费者的索引
instance-index: 1
bindings:
mySink-in-0:
destination: functionRocketMQTopic
group: test-stream4
consumer:
# 开启分区
partitioned: true
rocketmq:
binder:
name-server: 127.0.0.1:9876
enable-msg-trace: true
function:
definition: mySink
待实验验证。
七、多消息队列
我们可以使用Stream配置多个不同类型的MQ,具体实例待补充
SpringCloudStream自学文档 - 知乎 (zhihu.com)
八、其它特性
关于Stream的用法其实已经讲的差不多了,剩下的主要就是RocketMQ Binder的一些配置属性,比如:
- 消费者
- tags消息过滤
- 广播模式的开启
- 顺序消费
- 生产者
- 是否开启事务
- 是否同步发送
- 发送超时设置、消息最大值设置
- 重试次数
等等,其它特性都可以参考github上rocketmq的文档,本文就不再一一列举了。
RocketMQ en · alibaba/spring-cloud-alibaba Wiki · GitHub
九、补充
有的时候,发送MQ消息显示消息没有成功发送,出现了很奇怪的问题,网上找的方案是,需要关闭vip通道。
spring:
cloud:
stream:
rocketmq:
default:
producer:
# 关闭vip channel,否则消息会发送失败
vipChannelEnabled: false
尝试之后,确实有效。
网友评论