美文网首页
【Spring Cloud Stream】入门,并简单的配置Co

【Spring Cloud Stream】入门,并简单的配置Co

作者: 伊丽莎白2015 | 来源:发表于2023-01-22 22:55 被阅读0次

【本文内容】

  • 本文基于Spring Cloud Stream 3.2.4版本。
  • 介绍了官方的示例,定义Consumer接口来接收消息:
    • 使用的是RabbitMQ作为消息中间件:spring-cloud-stream-binder-rabbit
    • application.properties中加入RabbitMQ相关配置
    • 定义bean类型为Consumer接口的方法log()
  • 一些遗留问题,如@EnableBInding@StreamListener都已经不再使用(deprecated)。
  • 配置相关的学习:
    • 自动配置:当一个bean类型为SupplierFunctionConsumer时自动配置bindings。如上述官方示例方法log(),会定义exchange=log-in-0,queue=log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ,并通过routingKey=#绑定到exchange上。
    • 手动配置:bean类型为Function时方法uppercase():
      spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic-in
      spring.cloud.stream.bindings.uppercase-out-0.destination=my-topic-out
    • 加上binding name的手动配置
      spring.cloud.stream.function.bindings.uppercase-in-0=myInput
      spring.cloud.stream.bindings.myInput.destination=my-topic-in
      spring.cloud.stream.function.bindings.uppercase-out-0=output
      spring.cloud.stream.bindings.output.destination=my-topic-out
    • 多个functional时的binding:遇到超过1个functional的时候,需要显性指定function,如同时存在log()uppercase()
      spring.cloud.function.definition=log;uppercase
    • 没有functional的时候的binding(配合StreamBridge使用)
      spring.cloud.stream.input-bindings=fooin;barin
      spring.cloud.stream.output-bindings=fooout;barout
      则会创建2个in和2个out,其中in:fooin-in-0barin-in-0

1. 前言

官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html

在数据整合方面,Spring提供了自己的解决方案,即Spring Integration (https://spring.io/projects/spring-integration/),它提供了和外部系统连接的模式,外部系统如数据库,消息中间件,邮件等等。

后来随着微服务的流行,Spring Boot架构成为主流,Spring Integration和Spring Boot集成,形成了一个新的项目——Spring Cloud Stream。

Spring Cloud Stream可以帮助app从event事件中解藕出来,如在某个时间节点某个事件会发生,再通知给下游的app。总而言之,Spring Cloud Stream试图提供一种通用的抽象来支持不同的消息中间件,如RabbitMQ, Apache Kafka, Amazon Kinesis。换句话说,如果我们的系统在某个时候用的消息中间件是RabbitMQ,如果使用Spring Cloud Stream来处理消息,那么某个版本后想要切换到Kafka,那么不需要改很多代码,因为我们是面向Spring Cloud Stream的API在开发,而不是具体的跟RabbitMQ或是Kafka的API打交道。

2. 官方简单的例子

参考:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#spring-cloud-stream-preface-creating-sample-application

我用的是消息中间件是RabbitMQ。首先确保本地RabbitMQ已经安装。

2.1 依赖

除了Spring Boot相关,还需要加入:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
            <version>3.2.4</version>
        </dependency>
2.2 配置

新建application.properties,配置RabbitMQ相关:

spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
spring.rabbitmq.host=localhost
spring.rabbitmq.virtual-host=stream-test
2.3 相关代码
  • 使用了java8新接口Consumer来作为message的handler。关于Consumer,可以参考网友写的文章:【Java 8 新特性】Java Consumer示例
  • 定义了这个handler,Spring Cloud Stream会自动的定义相关的input destination。
  • 另外这个handler会自动的将消息转为Person对象,即消息为Json格式。
@SpringBootApplication
public class StreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamApplication.class, args);
    }

    @Bean
    public Consumer<Person> log() {
        return person -> {
            System.out.println("Received: " + person);
        };
    }

    public static class Person {
        private String name;
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        public String toString() {
            return this.name;
        }
    }
}
2.4 测试

启动项目后,可以看到日志:

2023-01-23 14:02:22.760 INFO 77705 --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ, bound to: log-in-0
2023-01-23 14:02:22.762 INFO 77705 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2023-01-23 14:02:22.848 INFO 77705 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2accaec2:0/SimpleConnection@3869a6e5 [delegate=amqp://guest@127.0.0.1:5672/stream-test, localPort= 58498]
...
2023-01-23 14:02:22.949 INFO 77705 --- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started bean 'inbound.log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ'
2023-01-23 14:02:22.969 INFO 77705 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2023-01-23 14:02:22.978 INFO 77705 --- [ main] com.faj.StreamApplication : Started StreamApplication in 3.301 seconds (JVM running for 3.778)

尝试在RabbitMQ console中publish一条消息: image.png 发送成功后,app端可以收到消息: image.png

3. 一些遗留的问题

经常看到一些hello world例子(如https://www.baeldung.com/spring-cloud-stream),会用一些annotation:@EnableBInding@StreamListener,目前在3.2.4版本中都已经不推荐使用了(即deprecated了)。推荐使用的是基于functional的编程模式,具体看https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#spring_cloud_function

4. 配置解释

4.1 自动创建exchange和queue并binding

在#2中的例子,我们创建了一个返回Java Consumer接口的bean:

    @Bean
    public Consumer<Person> log() {
        return person -> {
            System.out.println("Received: " + person);
        };
    }

打开RabbitMQ console page:http://localhost:15672/#/exchanges
可以看到自动创建了类型为topic的exchange=log-in-0:

image.png 并且创建了queue=log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ: image.png 绑定信息: image.png

可以看到基于Spring Cloud Stream在启动时帮我们做了很多自动化的配置:
当一个bean类型为SupplierFunctionConsumer时(这三个接口皆为java 8引入),就会当作是Spring Cloud Stream的message handlers,并会找寻相关的destination的binding。如果没有配置,则会按照遵循特定的规则。

4.2 手动配置的规则

基于functional编程binding的规则说明,以返回类型是Function为例:

    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }

在#4.1中的定义的Comsumer,关心的是如何消费,所以app中会自动创建input相关的配置。而本例中bean的类型为Function接口,它会有inputoutput,即接收消息,然后再将消息发送到下游。

input和output相关的bindings如下:

  • input:<方法名> + -in- + <index>
    -output:<方法名> + -out- + <index>

in或是out表示binding的类型(即inputoutput)。index表示序列号,如果是单个input/output,那么就是0。

如果是手动配置,我们希望uppercase()的接收queue为my-queue,那么input的完整的配置如下:

spring.cloud.stream.bindings.uppercase-in-0.destination=my-queue

output的配置也类似,使用的是:uppercase-out-0

4.3 Binding名

Descriptive Binding Names
为了增加可读性,可以给binding一个描述性的名字,可以通过配置spring.cloud.stream.function.bindings.<binding-name>来实现。如:

spring.cloud.stream.function.bindings.uppercase-in-0=myInput

即,通过上述配置,我们把uppercase-in-0重命名为myInput,那么我们在定义detination的时候,就可以用myInput来定义了:

spring.cloud.stream.bindings.myInput.destination=my-queue
4.4 测试

基于#4.2代码的测试:
case-1:使用自动配置,创建的exchange如下:

image.png queue如下: image.png input的exchange会binding到queue上: image.png

output没有binding的queue,很好理解,因为当前的app只需要关心发送到哪个exchange,至于下游谁在接收,它并不关心。

case-2:如果我们手动配置:

spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic-in
spring.cloud.stream.bindings.uppercase-out-0.destination=my-topic-out

或用binding name来配置(和上面的配置等价):

spring.cloud.stream.function.bindings.uppercase-in-0=myInput
spring.cloud.stream.bindings.myInput.destination=my-topic-in

spring.cloud.stream.function.bindings.uppercase-out-0=output
spring.cloud.stream.bindings.output.destination=my-topic-out
那么创建的exchange如下: image.png binding如下: image.png
4.5 多个functional时的binding

如果只有一个functional bean,如#2的log()或是#4.2中的uppercase(),那么不需要配置spring.cloud.function.definition(依赖说Spring Cloud Stream的auto-discovered机制),反之则需要配置。

比如我们配置了两个bean:

    @Bean
    public Consumer<Person> log() {
        return person -> {
            System.out.println("Received: " + person);
        };
    }

    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }

如果我们在application.properties文件中没有任何配置,打开RabbitMQ console会发现相关的exchange/queue并没有创建。加入配置:

spring.cloud.function.definition=log;uppercase
那么functional=log和uppercase会创建相关配置,而another()并不会: image.png

ps,一个bean时auto-detect机制会起作用,想要关闭这个功能,可以用spring.cloud.stream.function.autodetect=false

4.6 没有functional的时候的binding

我们上述都是基于functional的配置(即三个接口:FunctionSupplierConsumer),有时候我们需要明确的binding,但不需要绑定到具体的function上。如需要支持Spring的其它框架(如Spring Integration),这时候可能会用MessageChannel来操作。如果是这样,那么可以直接定义input/output,多个input可以用;来区分:

spring.cloud.stream.input-bindings=fooin;barin
spring.cloud.stream.output-bindings=fooout;barout
上述的配置,不需要定义额外的function,创建的exchange如下: image.png 创建的queue如下: image.png 同样的,input exchange会绑定到相应的queue上,而output exchange不会有queue绑定。如: image.png

那么没有functional的时候,谁负责消息的发送呢?Spring Cloud Stream提供了StreamBridge,也可以发送消息,具体看官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

相关文章

网友评论

      本文标题:【Spring Cloud Stream】入门,并简单的配置Co

      本文链接:https://www.haomeiwen.com/subject/xuukhdtx.html