【本文内容】
- 本文基于Spring Cloud Stream 3.2.4版本。
-
介绍了官方的示例,定义
Consumer
接口来接收消息:- 使用的是RabbitMQ作为消息中间件:
spring-cloud-stream-binder-rabbit
-
application.properties
中加入RabbitMQ相关配置 - 定义bean类型为
Consumer
接口的方法log()
- 使用的是RabbitMQ作为消息中间件:
-
一些遗留问题,如
@EnableBInding
、@StreamListener
都已经不再使用(deprecated)。 -
配置相关的学习:
-
自动配置:当一个bean类型为
Supplier
或Function
或Consumer
时自动配置bindings。如上述官方示例方法log(),会定义exchange=log-in-0
,queue=log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ
,并通过routingKey=#
绑定到exchange上。 -
手动配置:bean类型为
Function
时方法uppercase():
spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic-in
spring.cloud.stream.bindings.uppercase-out-0.destination=my-topic-out
-
加上binding name的手动配置:
spring.cloud.stream.function.bindings.uppercase-in-0=myInput
spring.cloud.stream.bindings.myInput.destination=my-topic-in
spring.cloud.stream.function.bindings.uppercase-out-0=output
spring.cloud.stream.bindings.output.destination=my-topic-out
-
多个functional时的binding:遇到超过1个functional的时候,需要显性指定function,如同时存在
log()
和uppercase()
:
spring.cloud.function.definition=log;uppercase
-
没有functional的时候的binding(配合
StreamBridge
使用):
spring.cloud.stream.input-bindings=fooin;barin
spring.cloud.stream.output-bindings=fooout;barout
则会创建2个in和2个out,其中in:fooin-in-0
,barin-in-0
。
-
自动配置:当一个bean类型为
1. 前言
官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html
在数据整合方面,Spring提供了自己的解决方案,即Spring Integration (https://spring.io/projects/spring-integration/),它提供了和外部系统连接的模式,外部系统如数据库,消息中间件,邮件等等。
后来随着微服务的流行,Spring Boot架构成为主流,Spring Integration和Spring Boot集成,形成了一个新的项目——Spring Cloud Stream。
Spring Cloud Stream可以帮助app从event事件中解藕出来,如在某个时间节点某个事件会发生,再通知给下游的app。总而言之,Spring Cloud Stream试图提供一种通用的抽象来支持不同的消息中间件,如RabbitMQ, Apache Kafka, Amazon Kinesis。换句话说,如果我们的系统在某个时候用的消息中间件是RabbitMQ,如果使用Spring Cloud Stream来处理消息,那么某个版本后想要切换到Kafka,那么不需要改很多代码,因为我们是面向Spring Cloud Stream的API在开发,而不是具体的跟RabbitMQ或是Kafka的API打交道。
2. 官方简单的例子
我用的是消息中间件是RabbitMQ。首先确保本地RabbitMQ已经安装。
2.1 依赖
除了Spring Boot相关,还需要加入:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
<version>3.2.4</version>
</dependency>
2.2 配置
新建application.properties
,配置RabbitMQ相关:
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
spring.rabbitmq.host=localhost
spring.rabbitmq.virtual-host=stream-test
2.3 相关代码
- 使用了java8新接口
Consumer
来作为message的handler。关于Consumer
,可以参考网友写的文章:【Java 8 新特性】Java Consumer示例 - 定义了这个handler,Spring Cloud Stream会自动的定义相关的input destination。
- 另外这个handler会自动的将消息转为
Person
对象,即消息为Json格式。
@SpringBootApplication
public class StreamApplication {
public static void main(String[] args) {
SpringApplication.run(StreamApplication.class, args);
}
@Bean
public Consumer<Person> log() {
return person -> {
System.out.println("Received: " + person);
};
}
public static class Person {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String toString() {
return this.name;
}
}
}
2.4 测试
启动项目后,可以看到日志:
尝试在RabbitMQ console中publish一条消息: image.png 发送成功后,app端可以收到消息: image.png2023-01-23 14:02:22.760 INFO 77705 --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ, bound to: log-in-0
2023-01-23 14:02:22.762 INFO 77705 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-01-23 14:02:22.848 INFO 77705 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2accaec2:0/SimpleConnection@3869a6e5 [delegate=amqp://guest@127.0.0.1:5672/stream-test, localPort= 58498]
...
2023-01-23 14:02:22.949 INFO 77705 --- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started bean 'inbound.log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ'
2023-01-23 14:02:22.969 INFO 77705 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2023-01-23 14:02:22.978 INFO 77705 --- [ main] com.faj.StreamApplication : Started StreamApplication in 3.301 seconds (JVM running for 3.778)
3. 一些遗留的问题
经常看到一些hello world例子(如https://www.baeldung.com/spring-cloud-stream),会用一些annotation:@EnableBInding
、@StreamListener
,目前在3.2.4版本中都已经不推荐使用了(即deprecated了)。推荐使用的是基于functional的编程模式,具体看https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#spring_cloud_function。
-
Reactive模块
目前已停更,如果想要继续使用,可以使用老的版本的spring-cloud-stream-reactive
。 - 测试相关的support
spring-cloud-stream-test-support
,具体查看https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_testing。 -
StreamMessageConverter
不再使用(deprecated)。 -
original-content-type
不再使用(deprecated)。 -
BinderAwareChannelResolver
不再使用(deprecated)。
4. 配置解释
4.1 自动创建exchange和queue并binding
在#2中的例子,我们创建了一个返回Java Consumer接口的bean:
@Bean
public Consumer<Person> log() {
return person -> {
System.out.println("Received: " + person);
};
}
打开RabbitMQ console page:http://localhost:15672/#/exchanges
可以看到自动创建了类型为topic的exchange=log-in-0:
可以看到基于Spring Cloud Stream在启动时帮我们做了很多自动化的配置:
当一个bean类型为Supplier
或Function
或Consumer
时(这三个接口皆为java 8引入),就会当作是Spring Cloud Stream的message handlers,并会找寻相关的destination的binding。如果没有配置,则会按照遵循特定的规则。
4.2 手动配置的规则
基于functional编程binding的规则说明,以返回类型是Function
为例:
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
在#4.1中的定义的Comsumer
,关心的是如何消费,所以app中会自动创建input
相关的配置。而本例中bean的类型为Function
接口,它会有input
和output
,即接收消息,然后再将消息发送到下游。
input和output相关的bindings如下:
- input:
<方法名> + -in- + <index>
-output:<方法名> + -out- + <index>
in
或是out
表示binding的类型(即input
和output
)。index
表示序列号,如果是单个input/output,那么就是0。
如果是手动配置,我们希望uppercase()的接收queue为my-queue,那么input的完整的配置如下:
spring.cloud.stream.bindings.uppercase-in-0.destination=my-queue
output的配置也类似,使用的是:uppercase-out-0
。
4.3 Binding名
Descriptive Binding Names
为了增加可读性,可以给binding一个描述性的名字,可以通过配置spring.cloud.stream.function.bindings.<binding-name>
来实现。如:
spring.cloud.stream.function.bindings.uppercase-in-0=myInput
即,通过上述配置,我们把uppercase-in-0
重命名为myInput,那么我们在定义detination的时候,就可以用myInput来定义了:
spring.cloud.stream.bindings.myInput.destination=my-queue
4.4 测试
基于#4.2代码的测试:
case-1:使用自动配置,创建的exchange如下:
output没有binding的queue,很好理解,因为当前的app只需要关心发送到哪个exchange,至于下游谁在接收,它并不关心。
case-2:如果我们手动配置:
spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic-in
spring.cloud.stream.bindings.uppercase-out-0.destination=my-topic-out
或用binding name来配置(和上面的配置等价):
spring.cloud.stream.function.bindings.uppercase-in-0=myInput
spring.cloud.stream.bindings.myInput.destination=my-topic-in
spring.cloud.stream.function.bindings.uppercase-out-0=output
spring.cloud.stream.bindings.output.destination=my-topic-out
那么创建的exchange如下:
image.png
binding如下:
image.png
4.5 多个functional时的binding
如果只有一个functional bean,如#2的log()或是#4.2中的uppercase(),那么不需要配置spring.cloud.function.definition
(依赖说Spring Cloud Stream的auto-discovered机制),反之则需要配置。
比如我们配置了两个bean:
@Bean
public Consumer<Person> log() {
return person -> {
System.out.println("Received: " + person);
};
}
@Bean
public Function<String, String> uppercase() {
return value -> value.toUpperCase();
}
如果我们在application.properties
文件中没有任何配置,打开RabbitMQ console会发现相关的exchange/queue并没有创建。加入配置:
spring.cloud.function.definition=log;uppercase
那么functional=log和uppercase会创建相关配置,而another()并不会:
image.png
ps,一个bean时auto-detect机制会起作用,想要关闭这个功能,可以用spring.cloud.stream.function.autodetect=false
。
4.6 没有functional的时候的binding
我们上述都是基于functional的配置(即三个接口:Function
,Supplier
,Consumer
),有时候我们需要明确的binding,但不需要绑定到具体的function上。如需要支持Spring的其它框架(如Spring Integration),这时候可能会用MessageChannel来操作。如果是这样,那么可以直接定义input/output,多个input可以用;
来区分:
spring.cloud.stream.input-bindings=fooin;barin
spring.cloud.stream.output-bindings=fooout;barout
上述的配置,不需要定义额外的function,创建的exchange如下:
image.png
创建的queue如下:
image.png
同样的,input exchange会绑定到相应的queue上,而output exchange不会有queue绑定。如:
image.png
那么没有functional的时候,谁负责消息的发送呢?Spring Cloud Stream提供了StreamBridge
,也可以发送消息,具体看官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources
网友评论