【前置文章】
【官方文档】
【本文内容】
本文基于Spring Cloud Stream 3.2.4版本,用RabbitMQ作为消息中间件。
-
使用
Supplier
接口发送消息:- 新建一个bean返回type为
Supplier
,方法为hello()
,会自动创建exchange=hello-out-0
- 默认情况下每1秒钟由Spring框架触发1次。
- 修改默认值:
spring.cloud.stream.poller.fixedDelay=5000
- Reactive风格编程,
Supplier<Flux<String>> hello()
,Flux.just(String)
,只会触发一次。 - Reactive风格编程,配合Thread.sleep和Schedulers.elastic(),可实现每1秒发送1次消息。
- Reactive风格编程,使用
@PollableBean
轻松实现每1秒发送1次消息。
- 新建一个bean返回type为
-
主动发送消息到一个output
- 使用
StreamBridge
发送消息到exchange=fooout-in-0
,配合spring.cloud.stream.output-bindings=fooout
使用。 - 使用
StreamBridge
发送消息到动态destination中,如myDestination
,没有该exchange则新建,有则跳过新建。 - 使用
ChannelInterceptor
拦截消息,配合@GlobalChannelInterceptor
使用。
- 使用
-
Functional组合:
spring.cloud.function.definition=toUpperCase|wrapInQuotes
- 多个input/output参数的Functions:参考官方文档
-
Batch Consumers:
Consumer<List<Person>> batchPerson()
-
Batch Producers:
Function<String, List<Message<String>>> batch()
- Spring IntegrationFlow作为functions:参考官方文档
1. 使用Supplier
接口发送消息
基于java8的三个接口:
-
Comsumer
相当于接收消息(即消费消息),所以只有input。 -
Function
相关于接收消息送处理后再发送给下游,所以有input/output。 -
Supplier
则是发送消息给下游,即生产者,配置上只有output。
Comsumer
和Function
在前置文章中有介绍,本章节介绍Supplier
。
首先,添加一个Supplier
的bean:
@Bean
public Supplier<String> hello() {
return () -> "hello world";
}
在RabbitMQ console中可以看到创建了exchange:
image.png
并没有queue创建,因为我们的app只需要负责将消息发送给exchange,至于下游的接收,则需要下游app自己处理。
值得注意的是,不管是Function
或是Consumer
,因为都有input
的配置,即消息从上游发送后,当前app中的方法就会被trigger(即方法的触发依赖说消息的到达)。
但Supplier
作为发送方,需要谁来触发呢?毕竟它没有绑定到任何一个input上。默认情况下,Spring框架会在启动的时候进行触发,并且每秒会触发一次。
为了验证上述结论,我们可以在项目中再额外定义一个Comsumer
用来接收上述Supplier
发送的消息:
@Bean
public Consumer<String> helloConsumer() {
return str -> {
log.info("Received: " + str);
};
}
在前置文章#4.5中讲到,如果有1个以上的functional bean,那么就需要显示定义出bean的名字,并且我们将Consumer
的input和Supplier
的output,指到了同一个exchange中:
spring.cloud.function.definition=helloConsumer;hello
spring.cloud.stream.bindings.helloConsumer-in-0.destination=hello
spring.cloud.stream.bindings.hello-out-0.destination=hello
启动程序后,可以看到在Consumer
端源源不断的接收到来自Supplier
的消息,而且间隔就是1秒:
默认的参数可以修改,详细看:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_polling_configuration_properties
比如我们加上spring.cloud.stream.poller.fixedDelay=5000
,那么Supplier
消息就会每隔5秒发送一次。
接下来,我们尝试使用reactive的编程风格来编写Supplier
的bean,下面的这个方法,其实只会被调用一次:
@Bean
public Supplier<Flux<String>> hello() {
return () -> Flux.just("hello from reactive style");
}
这个比较好测试,我们直接在exchange=hello上面手动binding一个queue=my-queue:
image.png
启动程序后发现,只能接收到一条message,就再也没有新的message了。
官方给了reactive想要实现每隔一秒发送消息的示例,这里使用了Schedulers以及让这个Thread sleep 1秒:
@Bean
public Supplier<Flux<String>> hello() {
return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
// ignore
}
return "Hello from Supplier";
}
})).subscribeOn(Schedulers.elastic()).share();
}
那么,有没有一种办法,可以用reactive的编程风格来编写Supplier
并且用更为简单的方式实现每隔1秒发送一次消息呢?(为什么要执着于每隔多久发送一次,这里的use case可以是我们需要源源不断的从一个数据源中主动拉取数据,处理好之后再发送给下游)。
答案是有的,可以使用@PollableBean
,以下的代码也会每隔1秒钟发送一次消息:
@PollableBean
public Supplier<Flux<String>> hello() {
return () -> Flux.just("hello");
}
2. 主动发送消息到一个output
很多时候我们并不能使用Supplier
来处理消息,主要原因是有些驱动并不能在启动时就设定好,而是有可能是通过REST api来发送一个消息,对于这种use case,Spring Cloud Stream提供了两种方式:
2.1 方式1:使用StreamBridge
发送消息到output
@RestController
public class StreamBridgeController {
@Autowired
private StreamBridge streamBridge;
@GetMapping("fooout")
public String delegateToSupplier() {
streamBridge.send("fooout-out-0", "hello from streamBridge");
return "success";
}
}
这时候就用到了上面#4.6的配置了,即没有绑定到具体的function的时候,需要配置spring.cloud.stream.input/output-binings
属性
spring.cloud.stream.output-bindings=fooout
启动项目后,自动创建exchange:
image.png
为了测试我们手动创建一个queue=my-queue,并绑定到这个exchange上:
image.png
调用API:http://localhost:8080/fooout后,可以接收到消息:
image.png在这里我们自动装载了一个StreamBridge
bean,使得可以发送消息到指定的output binding上。另外streamBridge.send(..)
发送的消息是一个Object
对象,即不仅可以发送String,还可以发送一个对象作为message,默认的conversion是json格式,比如我们尝试发送一个Student对象:
streamBridge.send("fooout-out-0", new Student(1, "test"));
image.png
2.2 方式2:使用StreamBridge
发送到动态的destination中
我们尝试发送消息到exchange=myDestination中,但不同于上个例子,这里的myDestination没有在application.properties中配置,因此,这样的exchange name会被当作是动态的destination(目的地)。
如果exchange=myDestination不存在,则会先创建出来,如果存在则跳过创建。
@GetMapping("dynamic")
public String dynamicDestination() {
streamBridge.send("myDestination", "hello dynamic destination.");
return "success";
}
动态新增的exchange:
image.png
2.3 StreamBridge.send(...)
其它方法:
-
boolean send(String bindingName, Object data, MimeType outputContentType)
:默认的content type是application/json
,但在send的时候可以指定content type。 -
boolean send(String bindingName, @Nullable String binderName, Object data)
:Spring Cloud Stream支持多个binder的case,比如从Kafka收消息再发送到RabbitMQ。通过StreamBridge发送时,可以指定相应的binderName。
2.4 使用拦截器拦截StreamBridge
StreamBridge
的send方法内部用的是MessageChannel
来建立output binding,MessageChannel
位于spring-messaging.jar
中,即这是Spring messaging中的代码,Jms用的也是这套,并不是Spring Cloud Stream独有。
关于MessageChannel
的Interceptor,这篇文章更为详细:https://dzone.com/articles/spring-cloud-stream-channel-interceptor
MessageChannel
可以用ChannelInterceptor
来拦截。这个接口有6个方法,基本涵盖了接收消息的各个阶段:
- preSend(...)
- postSend(...)
- afterSendCompletion(...)
- preReceive(...)
- postReceive(...)
- afterReceiveCompletion(...)
先新建一个ChannelInterceptor
的bean,Spring Cloud Stream并不会注入这个interceptor,除非这个bean加上@GlobalChannelInterceptor(patterns = "*")
。
@Bean
@GlobalChannelInterceptor(patterns = "*")
public ChannelInterceptor customInterceptor() {
return new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
String bytes = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
log.info("interceptor message: {}", bytes);
return message;
}
};
}
【测试】
访问上述#2.1的地址:http://localhost:8080/fooout,控制台日志:
2023-01-24 12:23:57.308 INFO 25053 --- [nio-8080-exec-1] com.config.MessagInterceptor : interceptor message: {"id":1,"name":"test"}
访问上述#2.2的地址:http://localhost:8080/dynamic,控制台日志:
2023-01-24 12:24:45.136 INFO 25053 --- [nio-8080-exec-2] com.config.MessagInterceptor : interceptor message: hello dynamic destination.
上述的@GlobalChannelInterceptor(patterns = "*")
会拦截所有的binding,如果只想拦截#2.1的fooout-out-0的binding,那么可以把patterns改成:@GlobalChannelInterceptor(patterns = "fooout-*")
。
3. Functional组合
使用functional编程还可以将一系列的functional组合起来使用。例如有两个Function bean:
@Bean
public Function<String, String> toUpperCase() {
return s -> s.toUpperCase();
}
@Bean
public Function<String, String> wrapInQuotes() {
return s -> "\"" + s + "\"";
}
超过1个以上的functional我们需要配置spring.cloud.function.definition
,在前置文章#4.5中有介绍,可以使用;
隔开。
但如果我们用|
隔开,就表示functional的组合,即消息从toUpperCase接收,处理(这个方法的处理结果是把消息变成大写),然后再发送给下游,下游是wrapInQuotes方法(这个方法是把消息加上双引号),然后再发送给下游:
spring.cloud.function.definition=toUpperCase|wrapInQuotes
【测试】
我们测试的开始端是RabbitMQ console --> 在exchange=toUpperCasewrapInQuotes-in-0
中发送消息 --> 消息被toUpperCase()
接收 --> 转大写后交给下一个functional处理(这里也是通过发消息),即wrapInQuotes()
--> 给消息加上双引号 --> 发送消息给toUpperCasewrapInQuotes-out-0
--> 为了测试方便,我们新建一个my-queue,来接收消息。
如果觉得名字太长,可以按前置文章的#4.2章,可以先给binding标下name:
spring.cloud.function.definition=toUpperCase|wrapInQuotes
spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=input
spring.cloud.stream.bindings.input.destination=my-input
这样原先的exchange=toUpperCasewrapInQuotes-in-0
就会变为my-input
。
4. 多个input/output参数的Functions
为什么会有这种需求?
- 大数据:你的数据来源可能不止一个地方。
- 数据聚合:有时候需要我们将来自不同stream的数据进行聚合。
5. Batch Consumers/Producers
5.1 Batch Consumers
具体查看官方文档:
- https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_batch_consumers
- https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream-binder-rabbit.html#_consumer_side_batching
图来自文章,虽然文章用的是中间件是solace,但图表达的很好:
image.png首先,定义一个bean为Consumer
的方法,可以批量接收Person,并打印出来:
@Bean
public Consumer<List<Person>> batchPerson() {
return persons -> {
log.info("Received " + persons.size());
persons.forEach(p -> log.info("received: " + p));
};
}
需要相应的配置:
spring.cloud.stream.bindings.batchPerson-in-0.consumer.batch-mode=true
spring.cloud.stream.rabbit.bindings.batchPerson-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.batchPerson-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.batchPerson-in-0.consumer.receive-timeout=200
【测试】
为了方便测试,写了测试代码:
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("batch-consumer")
public String batch() {
rabbitTemplate.convertAndSend("batchPerson-in-0", "#", "{\"name\":\"aaa\"}");
rabbitTemplate.convertAndSend("batchPerson-in-0", "#", "{\"name\":\"bbb\"}");
return "success";
}
浏览器调用http://localhost:8080/batch-consumer后,日志打印:
image.png5.2 Batch Producers
image.png首先,创建producer bean,该方法会接收1个字符串,然后再分发出4条消息:
@Bean
public Function<String, List<Message<String>>> batch() {
return p -> {
List<Message<String>> list = new ArrayList<>();
list.add(MessageBuilder.withPayload(p + ":1").build());
list.add(MessageBuilder.withPayload(p + ":2").build());
list.add(MessageBuilder.withPayload(p + ":3").build());
list.add(MessageBuilder.withPayload(p + ":4").build());
return list;
};
}
【测试】
为了方便测试,我们写一个Consumer
来接收上述批量Producer的结果:
@Bean
public Consumer<String> batchConsumer() {
return str -> log.info("received: " + str);
}
超过1个functional时,需要显性指定名字,以及我们把batchConsumer的in指定为上述batch producer的下游:
spring.cloud.function.definition=batch;batchConsumer
spring.cloud.stream.bindings.batchConsumer-in-0.destination=batch-out-0
在RabbitMQ console,exchange=batch-in-0
中发布1条消息:
6. Spring IntegrationFlow作为functions
Spring Cloud Stream和Spring Integration集成,详细参考官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_spring_integration_flow_as_functions
网友评论