美文网首页
【Spring Cloud Stream】入门,并简单的配置Co

【Spring Cloud Stream】入门,并简单的配置Co

作者: 伊丽莎白2015 | 来源:发表于2023-01-22 22:55 被阅读0次

    【本文内容】

    • 本文基于Spring Cloud Stream 3.2.4版本。
    • 介绍了官方的示例,定义Consumer接口来接收消息:
      • 使用的是RabbitMQ作为消息中间件:spring-cloud-stream-binder-rabbit
      • application.properties中加入RabbitMQ相关配置
      • 定义bean类型为Consumer接口的方法log()
    • 一些遗留问题,如@EnableBInding@StreamListener都已经不再使用(deprecated)。
    • 配置相关的学习:
      • 自动配置:当一个bean类型为SupplierFunctionConsumer时自动配置bindings。如上述官方示例方法log(),会定义exchange=log-in-0,queue=log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ,并通过routingKey=#绑定到exchange上。
      • 手动配置:bean类型为Function时方法uppercase():
        spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic-in
        spring.cloud.stream.bindings.uppercase-out-0.destination=my-topic-out
      • 加上binding name的手动配置
        spring.cloud.stream.function.bindings.uppercase-in-0=myInput
        spring.cloud.stream.bindings.myInput.destination=my-topic-in
        spring.cloud.stream.function.bindings.uppercase-out-0=output
        spring.cloud.stream.bindings.output.destination=my-topic-out
      • 多个functional时的binding:遇到超过1个functional的时候,需要显性指定function,如同时存在log()uppercase()
        spring.cloud.function.definition=log;uppercase
      • 没有functional的时候的binding(配合StreamBridge使用)
        spring.cloud.stream.input-bindings=fooin;barin
        spring.cloud.stream.output-bindings=fooout;barout
        则会创建2个in和2个out,其中in:fooin-in-0barin-in-0

    1. 前言

    官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html

    在数据整合方面,Spring提供了自己的解决方案,即Spring Integration (https://spring.io/projects/spring-integration/),它提供了和外部系统连接的模式,外部系统如数据库,消息中间件,邮件等等。

    后来随着微服务的流行,Spring Boot架构成为主流,Spring Integration和Spring Boot集成,形成了一个新的项目——Spring Cloud Stream。

    Spring Cloud Stream可以帮助app从event事件中解藕出来,如在某个时间节点某个事件会发生,再通知给下游的app。总而言之,Spring Cloud Stream试图提供一种通用的抽象来支持不同的消息中间件,如RabbitMQ, Apache Kafka, Amazon Kinesis。换句话说,如果我们的系统在某个时候用的消息中间件是RabbitMQ,如果使用Spring Cloud Stream来处理消息,那么某个版本后想要切换到Kafka,那么不需要改很多代码,因为我们是面向Spring Cloud Stream的API在开发,而不是具体的跟RabbitMQ或是Kafka的API打交道。

    2. 官方简单的例子

    参考:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#spring-cloud-stream-preface-creating-sample-application

    我用的是消息中间件是RabbitMQ。首先确保本地RabbitMQ已经安装。

    2.1 依赖

    除了Spring Boot相关,还需要加入:

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
                <version>3.2.4</version>
            </dependency>
    
    2.2 配置

    新建application.properties,配置RabbitMQ相关:

    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.port=5672
    spring.rabbitmq.host=localhost
    spring.rabbitmq.virtual-host=stream-test
    
    2.3 相关代码
    • 使用了java8新接口Consumer来作为message的handler。关于Consumer,可以参考网友写的文章:【Java 8 新特性】Java Consumer示例
    • 定义了这个handler,Spring Cloud Stream会自动的定义相关的input destination。
    • 另外这个handler会自动的将消息转为Person对象,即消息为Json格式。
    @SpringBootApplication
    public class StreamApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(StreamApplication.class, args);
        }
    
        @Bean
        public Consumer<Person> log() {
            return person -> {
                System.out.println("Received: " + person);
            };
        }
    
        public static class Person {
            private String name;
            public String getName() {
                return name;
            }
            public void setName(String name) {
                this.name = name;
            }
            public String toString() {
                return this.name;
            }
        }
    }
    
    2.4 测试

    启动项目后,可以看到日志:

    2023-01-23 14:02:22.760 INFO 77705 --- [ main] c.s.b.r.p.RabbitExchangeQueueProvisioner : declaring queue for inbound: log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ, bound to: log-in-0
    2023-01-23 14:02:22.762 INFO 77705 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
    2023-01-23 14:02:22.848 INFO 77705 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#2accaec2:0/SimpleConnection@3869a6e5 [delegate=amqp://guest@127.0.0.1:5672/stream-test, localPort= 58498]
    ...
    2023-01-23 14:02:22.949 INFO 77705 --- [ main] o.s.i.a.i.AmqpInboundChannelAdapter : started bean 'inbound.log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ'
    2023-01-23 14:02:22.969 INFO 77705 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
    2023-01-23 14:02:22.978 INFO 77705 --- [ main] com.faj.StreamApplication : Started StreamApplication in 3.301 seconds (JVM running for 3.778)

    尝试在RabbitMQ console中publish一条消息: image.png 发送成功后,app端可以收到消息: image.png

    3. 一些遗留的问题

    经常看到一些hello world例子(如https://www.baeldung.com/spring-cloud-stream),会用一些annotation:@EnableBInding@StreamListener,目前在3.2.4版本中都已经不推荐使用了(即deprecated了)。推荐使用的是基于functional的编程模式,具体看https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#spring_cloud_function

    4. 配置解释

    4.1 自动创建exchange和queue并binding

    在#2中的例子,我们创建了一个返回Java Consumer接口的bean:

        @Bean
        public Consumer<Person> log() {
            return person -> {
                System.out.println("Received: " + person);
            };
        }
    

    打开RabbitMQ console page:http://localhost:15672/#/exchanges
    可以看到自动创建了类型为topic的exchange=log-in-0:

    image.png 并且创建了queue=log-in-0.anonymous.OQf4roW7QCOCvyciCGaVHQ: image.png 绑定信息: image.png

    可以看到基于Spring Cloud Stream在启动时帮我们做了很多自动化的配置:
    当一个bean类型为SupplierFunctionConsumer时(这三个接口皆为java 8引入),就会当作是Spring Cloud Stream的message handlers,并会找寻相关的destination的binding。如果没有配置,则会按照遵循特定的规则。

    4.2 手动配置的规则

    基于functional编程binding的规则说明,以返回类型是Function为例:

        @Bean
        public Function<String, String> uppercase() {
            return value -> value.toUpperCase();
        }
    

    在#4.1中的定义的Comsumer,关心的是如何消费,所以app中会自动创建input相关的配置。而本例中bean的类型为Function接口,它会有inputoutput,即接收消息,然后再将消息发送到下游。

    input和output相关的bindings如下:

    • input:<方法名> + -in- + <index>
      -output:<方法名> + -out- + <index>

    in或是out表示binding的类型(即inputoutput)。index表示序列号,如果是单个input/output,那么就是0。

    如果是手动配置,我们希望uppercase()的接收queue为my-queue,那么input的完整的配置如下:

    spring.cloud.stream.bindings.uppercase-in-0.destination=my-queue
    

    output的配置也类似,使用的是:uppercase-out-0

    4.3 Binding名

    Descriptive Binding Names
    为了增加可读性,可以给binding一个描述性的名字,可以通过配置spring.cloud.stream.function.bindings.<binding-name>来实现。如:

    spring.cloud.stream.function.bindings.uppercase-in-0=myInput
    

    即,通过上述配置,我们把uppercase-in-0重命名为myInput,那么我们在定义detination的时候,就可以用myInput来定义了:

    spring.cloud.stream.bindings.myInput.destination=my-queue
    
    4.4 测试

    基于#4.2代码的测试:
    case-1:使用自动配置,创建的exchange如下:

    image.png queue如下: image.png input的exchange会binding到queue上: image.png

    output没有binding的queue,很好理解,因为当前的app只需要关心发送到哪个exchange,至于下游谁在接收,它并不关心。

    case-2:如果我们手动配置:

    spring.cloud.stream.bindings.uppercase-in-0.destination=my-topic-in
    spring.cloud.stream.bindings.uppercase-out-0.destination=my-topic-out
    

    或用binding name来配置(和上面的配置等价):

    spring.cloud.stream.function.bindings.uppercase-in-0=myInput
    spring.cloud.stream.bindings.myInput.destination=my-topic-in
    
    spring.cloud.stream.function.bindings.uppercase-out-0=output
    spring.cloud.stream.bindings.output.destination=my-topic-out
    
    那么创建的exchange如下: image.png binding如下: image.png
    4.5 多个functional时的binding

    如果只有一个functional bean,如#2的log()或是#4.2中的uppercase(),那么不需要配置spring.cloud.function.definition(依赖说Spring Cloud Stream的auto-discovered机制),反之则需要配置。

    比如我们配置了两个bean:

        @Bean
        public Consumer<Person> log() {
            return person -> {
                System.out.println("Received: " + person);
            };
        }
    
        @Bean
        public Function<String, String> uppercase() {
            return value -> value.toUpperCase();
        }
    

    如果我们在application.properties文件中没有任何配置,打开RabbitMQ console会发现相关的exchange/queue并没有创建。加入配置:

    spring.cloud.function.definition=log;uppercase
    
    那么functional=log和uppercase会创建相关配置,而another()并不会: image.png

    ps,一个bean时auto-detect机制会起作用,想要关闭这个功能,可以用spring.cloud.stream.function.autodetect=false

    4.6 没有functional的时候的binding

    我们上述都是基于functional的配置(即三个接口:FunctionSupplierConsumer),有时候我们需要明确的binding,但不需要绑定到具体的function上。如需要支持Spring的其它框架(如Spring Integration),这时候可能会用MessageChannel来操作。如果是这样,那么可以直接定义input/output,多个input可以用;来区分:

    spring.cloud.stream.input-bindings=fooin;barin
    spring.cloud.stream.output-bindings=fooout;barout
    
    上述的配置,不需要定义额外的function,创建的exchange如下: image.png 创建的queue如下: image.png 同样的,input exchange会绑定到相应的queue上,而output exchange不会有queue绑定。如: image.png

    那么没有functional的时候,谁负责消息的发送呢?Spring Cloud Stream提供了StreamBridge,也可以发送消息,具体看官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

    相关文章

      网友评论

          本文标题:【Spring Cloud Stream】入门,并简单的配置Co

          本文链接:https://www.haomeiwen.com/subject/xuukhdtx.html