美文网首页
【Spring Cloud Stream】发送和接收消息

【Spring Cloud Stream】发送和接收消息

作者: 伊丽莎白2015 | 来源:发表于2023-01-23 19:08 被阅读0次

    【前置文章】

    【官方文档】

    【本文内容】
    本文基于Spring Cloud Stream 3.2.4版本,用RabbitMQ作为消息中间件。

    • 使用Supplier接口发送消息:
      • 新建一个bean返回type为Supplier,方法为hello(),会自动创建exchange=hello-out-0
      • 默认情况下每1秒钟由Spring框架触发1次。
      • 修改默认值:spring.cloud.stream.poller.fixedDelay=5000
      • Reactive风格编程,Supplier<Flux<String>> hello()Flux.just(String),只会触发一次。
      • Reactive风格编程,配合Thread.sleep和Schedulers.elastic(),可实现每1秒发送1次消息。
      • Reactive风格编程,使用@PollableBean轻松实现每1秒发送1次消息。
    • 主动发送消息到一个output
      • 使用StreamBridge发送消息到exchange=fooout-in-0,配合spring.cloud.stream.output-bindings=fooout使用。
      • 使用StreamBridge发送消息到动态destination中,如myDestination,没有该exchange则新建,有则跳过新建。
      • 使用ChannelInterceptor拦截消息,配合@GlobalChannelInterceptor使用。
    • Functional组合spring.cloud.function.definition=toUpperCase|wrapInQuotes
    • 多个input/output参数的Functions:参考官方文档
    • Batch ConsumersConsumer<List<Person>> batchPerson()
    • Batch ProducersFunction<String, List<Message<String>>> batch()
    • Spring IntegrationFlow作为functions:参考官方文档

    1. 使用Supplier接口发送消息

    基于java8的三个接口:

    • Comsumer相当于接收消息(即消费消息),所以只有input。
    • Function相关于接收消息送处理后再发送给下游,所以有input/output。
    • Supplier则是发送消息给下游,即生产者,配置上只有output。

    ComsumerFunction在前置文章中有介绍,本章节介绍Supplier

    首先,添加一个Supplier的bean:

        @Bean
        public Supplier<String> hello() {
            return () -> "hello world";
        }
    
    在RabbitMQ console中可以看到创建了exchange: image.png

    并没有queue创建,因为我们的app只需要负责将消息发送给exchange,至于下游的接收,则需要下游app自己处理。

    值得注意的是,不管是Function或是Consumer,因为都有input的配置,即消息从上游发送后,当前app中的方法就会被trigger(即方法的触发依赖说消息的到达)。

    Supplier作为发送方,需要谁来触发呢?毕竟它没有绑定到任何一个input上。默认情况下,Spring框架会在启动的时候进行触发,并且每秒会触发一次。

    为了验证上述结论,我们可以在项目中再额外定义一个Comsumer用来接收上述Supplier发送的消息:

        @Bean
        public Consumer<String> helloConsumer() {
            return str -> {
                log.info("Received: " + str);
            };
        }
    

    在前置文章#4.5中讲到,如果有1个以上的functional bean,那么就需要显示定义出bean的名字,并且我们将Consumer的input和Supplier的output,指到了同一个exchange中:

    spring.cloud.function.definition=helloConsumer;hello
    spring.cloud.stream.bindings.helloConsumer-in-0.destination=hello
    spring.cloud.stream.bindings.hello-out-0.destination=hello
    

    启动程序后,可以看到在Consumer端源源不断的接收到来自Supplier的消息,而且间隔就是1秒:

    image.png

    默认的参数可以修改,详细看:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_polling_configuration_properties
    比如我们加上spring.cloud.stream.poller.fixedDelay=5000,那么Supplier消息就会每隔5秒发送一次。

    接下来,我们尝试使用reactive的编程风格来编写Supplier的bean,下面的这个方法,其实只会被调用一次:

        @Bean
        public Supplier<Flux<String>> hello() {
            return () -> Flux.just("hello from reactive style");
        }
    
    这个比较好测试,我们直接在exchange=hello上面手动binding一个queue=my-queue: image.png

    启动程序后发现,只能接收到一条message,就再也没有新的message了。

    官方给了reactive想要实现每隔一秒发送消息的示例,这里使用了Schedulers以及让这个Thread sleep 1秒:

    @Bean
        public Supplier<Flux<String>> hello() {
            return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
                @Override
                public String get() {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                        // ignore
                    }
                    return "Hello from Supplier";
                }
            })).subscribeOn(Schedulers.elastic()).share();
        }
    

    那么,有没有一种办法,可以用reactive的编程风格来编写Supplier并且用更为简单的方式实现每隔1秒发送一次消息呢?(为什么要执着于每隔多久发送一次,这里的use case可以是我们需要源源不断的从一个数据源中主动拉取数据,处理好之后再发送给下游)。

    答案是有的,可以使用@PollableBean,以下的代码也会每隔1秒钟发送一次消息:

        @PollableBean
        public Supplier<Flux<String>> hello() {
            return () -> Flux.just("hello");
        }
    

    2. 主动发送消息到一个output

    很多时候我们并不能使用Supplier来处理消息,主要原因是有些驱动并不能在启动时就设定好,而是有可能是通过REST api来发送一个消息,对于这种use case,Spring Cloud Stream提供了两种方式:

    2.1 方式1:使用StreamBridge发送消息到output
    @RestController
    public class StreamBridgeController {
        @Autowired
        private StreamBridge streamBridge;
    
        @GetMapping("fooout")
        public String delegateToSupplier() {
            streamBridge.send("fooout-out-0", "hello from streamBridge");
            return "success";
        }
    }
    

    这时候就用到了上面#4.6的配置了,即没有绑定到具体的function的时候,需要配置spring.cloud.stream.input/output-binings属性

    spring.cloud.stream.output-bindings=fooout
    
    启动项目后,自动创建exchange: image.png 为了测试我们手动创建一个queue=my-queue,并绑定到这个exchange上: image.png

    调用API:http://localhost:8080/fooout后,可以接收到消息:

    image.png

    在这里我们自动装载了一个StreamBridge bean,使得可以发送消息到指定的output binding上。另外streamBridge.send(..)发送的消息是一个Object对象,即不仅可以发送String,还可以发送一个对象作为message,默认的conversion是json格式,比如我们尝试发送一个Student对象:

            streamBridge.send("fooout-out-0", new Student(1, "test"));
    
    image.png
    2.2 方式2:使用StreamBridge发送到动态的destination中

    我们尝试发送消息到exchange=myDestination中,但不同于上个例子,这里的myDestination没有在application.properties中配置,因此,这样的exchange name会被当作是动态的destination(目的地)。

    如果exchange=myDestination不存在,则会先创建出来,如果存在则跳过创建。

        @GetMapping("dynamic")
        public String dynamicDestination() {
            streamBridge.send("myDestination", "hello dynamic destination.");
            return "success";
        }
    
    动态新增的exchange: image.png
    2.3 StreamBridge.send(...)其它方法:
    • boolean send(String bindingName, Object data, MimeType outputContentType):默认的content type是application/json,但在send的时候可以指定content type。
    • boolean send(String bindingName, @Nullable String binderName, Object data):Spring Cloud Stream支持多个binder的case,比如从Kafka收消息再发送到RabbitMQ。通过StreamBridge发送时,可以指定相应的binderName。
    2.4 使用拦截器拦截StreamBridge

    StreamBridge的send方法内部用的是MessageChannel来建立output binding,MessageChannel位于spring-messaging.jar中,即这是Spring messaging中的代码,Jms用的也是这套,并不是Spring Cloud Stream独有。

    关于MessageChannel的Interceptor,这篇文章更为详细:https://dzone.com/articles/spring-cloud-stream-channel-interceptor

    MessageChannel可以用ChannelInterceptor来拦截。这个接口有6个方法,基本涵盖了接收消息的各个阶段:

    • preSend(...)
    • postSend(...)
    • afterSendCompletion(...)
    • preReceive(...)
    • postReceive(...)
    • afterReceiveCompletion(...)

    先新建一个ChannelInterceptor的bean,Spring Cloud Stream并不会注入这个interceptor,除非这个bean加上@GlobalChannelInterceptor(patterns = "*")

        @Bean
        @GlobalChannelInterceptor(patterns = "*")
        public ChannelInterceptor customInterceptor() {
            return new ChannelInterceptor() {
                @Override
                public Message<?> preSend(Message<?> message, MessageChannel channel) {
                    String bytes = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
                    log.info("interceptor message: {}", bytes);
                    return message;
                }
            };
        }
    

    【测试】
    访问上述#2.1的地址:http://localhost:8080/fooout,控制台日志:

    2023-01-24 12:23:57.308 INFO 25053 --- [nio-8080-exec-1] com.config.MessagInterceptor : interceptor message: {"id":1,"name":"test"}

    访问上述#2.2的地址:http://localhost:8080/dynamic,控制台日志:

    2023-01-24 12:24:45.136 INFO 25053 --- [nio-8080-exec-2] com.config.MessagInterceptor : interceptor message: hello dynamic destination.

    上述的@GlobalChannelInterceptor(patterns = "*")会拦截所有的binding,如果只想拦截#2.1的fooout-out-0的binding,那么可以把patterns改成:@GlobalChannelInterceptor(patterns = "fooout-*")

    3. Functional组合

    使用functional编程还可以将一系列的functional组合起来使用。例如有两个Function bean:

        @Bean
        public Function<String, String> toUpperCase() {
            return s -> s.toUpperCase();
        }
    
        @Bean
        public Function<String, String> wrapInQuotes() {
            return s -> "\"" + s + "\"";
        }
    

    超过1个以上的functional我们需要配置spring.cloud.function.definition,在前置文章#4.5中有介绍,可以使用;隔开。

    但如果我们用|隔开,就表示functional的组合,即消息从toUpperCase接收,处理(这个方法的处理结果是把消息变成大写),然后再发送给下游,下游是wrapInQuotes方法(这个方法是把消息加上双引号),然后再发送给下游:

    spring.cloud.function.definition=toUpperCase|wrapInQuotes
    

    【测试】

    image.png

    我们测试的开始端是RabbitMQ console --> 在exchange=toUpperCasewrapInQuotes-in-0中发送消息 --> 消息被toUpperCase()接收 --> 转大写后交给下一个functional处理(这里也是通过发消息),即wrapInQuotes() --> 给消息加上双引号 --> 发送消息给toUpperCasewrapInQuotes-out-0 --> 为了测试方便,我们新建一个my-queue,来接收消息。

    首先发送消息: image.png 再看到消息拦截器preSend方法发送的消息: image.png 最后my-queue收到的消息,可以看到转成了大写后又加了括号: image.png

    如果觉得名字太长,可以按前置文章的#4.2章,可以先给binding标下name:

    spring.cloud.function.definition=toUpperCase|wrapInQuotes
    spring.cloud.stream.function.bindings.toUpperCase|wrapInQuotes-in-0=input
    spring.cloud.stream.bindings.input.destination=my-input
    

    这样原先的exchange=toUpperCasewrapInQuotes-in-0就会变为my-input

    4. 多个input/output参数的Functions

    为什么会有这种需求?

    • 大数据:你的数据来源可能不止一个地方。
    • 数据聚合:有时候需要我们将来自不同stream的数据进行聚合。

    具体查看官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_functions_with_multiple_input_and_output_arguments

    5. Batch Consumers/Producers

    5.1 Batch Consumers

    具体查看官方文档:

    图来自文章,虽然文章用的是中间件是solace,但图表达的很好:

    image.png

    首先,定义一个bean为Consumer的方法,可以批量接收Person,并打印出来:

        @Bean
        public Consumer<List<Person>> batchPerson() {
            return persons -> {
                log.info("Received " + persons.size());
                persons.forEach(p -> log.info("received: " + p));
            };
        }
    

    需要相应的配置:

    spring.cloud.stream.bindings.batchPerson-in-0.consumer.batch-mode=true
    
    spring.cloud.stream.rabbit.bindings.batchPerson-in-0.consumer.enable-batching=true
    spring.cloud.stream.rabbit.bindings.batchPerson-in-0.consumer.batch-size=10
    spring.cloud.stream.rabbit.bindings.batchPerson-in-0.consumer.receive-timeout=200
    

    【测试】
    为了方便测试,写了测试代码:

        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping("batch-consumer")
        public String batch() {
            rabbitTemplate.convertAndSend("batchPerson-in-0", "#", "{\"name\":\"aaa\"}");
            rabbitTemplate.convertAndSend("batchPerson-in-0", "#", "{\"name\":\"bbb\"}");
            return "success";
        }
    

    浏览器调用http://localhost:8080/batch-consumer后,日志打印:

    image.png
    5.2 Batch Producers
    image.png

    首先,创建producer bean,该方法会接收1个字符串,然后再分发出4条消息:

    @Bean
        public Function<String, List<Message<String>>> batch() {
            return p -> {
                List<Message<String>> list = new ArrayList<>();
                list.add(MessageBuilder.withPayload(p + ":1").build());
                list.add(MessageBuilder.withPayload(p + ":2").build());
                list.add(MessageBuilder.withPayload(p + ":3").build());
                list.add(MessageBuilder.withPayload(p + ":4").build());
                return list;
            };
        }
    

    【测试】
    为了方便测试,我们写一个Consumer来接收上述批量Producer的结果:

        @Bean
        public Consumer<String> batchConsumer() {
            return str -> log.info("received: " + str);
        }
    

    超过1个functional时,需要显性指定名字,以及我们把batchConsumer的in指定为上述batch producer的下游:

    spring.cloud.function.definition=batch;batchConsumer
    spring.cloud.stream.bindings.batchConsumer-in-0.destination=batch-out-0
    

    在RabbitMQ console,exchange=batch-in-0中发布1条消息:

    image.png batchConsumer端收到了4条消息: image.png

    6. Spring IntegrationFlow作为functions

    Spring Cloud Stream和Spring Integration集成,详细参考官方文档:https://docs.spring.io/spring-cloud-stream/docs/3.2.4/reference/html/spring-cloud-stream.html#_spring_integration_flow_as_functions

    相关文章

      网友评论

          本文标题:【Spring Cloud Stream】发送和接收消息

          本文链接:https://www.haomeiwen.com/subject/xphlhdtx.html