美文网首页
Stream使用入门

Stream使用入门

作者: 文景大大 | 来源:发表于2021-07-21 19:30 被阅读0次

    一、Stream介绍

    SpringCloud Stream是在Spring Messaging和Spring Integration这两个项目的基础上发展而来的,我们在项目中可能使用多个MQ,或者在项目的某个阶段需要更换MQ,如果使用了Stream,我们只要更改配置即可,完全不用修改消息发送和接收的代码。而且Stream定义的消息收发模型使得我们代码更加简洁易懂。

    详细的介绍文档可以参考:
    Spring Cloud Stream 体系及原理介绍 (qq.com)

    二、入门案例

    首先我们从start.spring.io下载一个全新的项目,需要的依赖有web、lombok,下载好了之后再自行引入stream-rocketmq的依赖:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        <version>2021.1</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    

    然后,我们开始创建生产者:

    @Slf4j
    @Component
    @EnableBinding(Source.class)
    public class MyMessageProducer {
    
        @Autowired
        private Source source;
    
        public void send(String message){
            log.info("生产者开始发送消息!");
            source.output().send(MessageBuilder.withPayload(message).build());
        }
    
    }
    

    我们使用Controller来出发生产者发送消息:

    @RestController
    public class ProducerController {
    
        @Autowired
        private MyMessageProducer producer;
    
        @GetMapping("/sendMessage")
        public String sendMessage(){
            producer.send("hello spring cloud stream!");
            return "OK";
        }
    
    }
    

    随后,我们创建消费者:

    @Slf4j
    @Component
    @EnableBinding(Sink.class)
    public class MyMessageConsumer {
    
        @StreamListener(Sink.INPUT)
        public void receive(String message){
            log.info("监听到消息内容为:{}", message);
        }
        
    }
    

    最后,是我们最重要的配置文件内容:

    server:
      port: 8081
    
    spring:
      application:
        name: stream-demo
      cloud:
        stream:
          bindings:
            output:
              destination: myRocketMQTopic
              group: test-stream
            input:
              destination: myRocketMQTopic
              group: test-stream
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
    

    在启动MQ服务的前提下,启动我们的项目,就能成功运行一个Stream-Rocketmq的快速入门案例了。我们浏览器访问链接,使得生产者发送消息,随后消费者就能监听到这个消息。

    2021-07-16 09:35:26.805  INFO 16920 --- [nio-8081-exec-1] c.e.s.producer.MyMessageProducer         : 生产者开始发送消息!
    2021-07-16 09:35:49.737  INFO 16920 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer         : 监听到消息内容为:hello spring cloud stream!
    2021-07-16 09:36:14.371  INFO 16920 --- [nio-8081-exec-3] c.e.s.producer.MyMessageProducer         : 生产者开始发送消息!
    2021-07-16 09:36:14.379  INFO 16920 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer         : 监听到消息内容为:hello spring cloud stream!
    

    三、自定义Channel

    在如上的入门案例中,我们使用的是Stream自带的output、input通道,我们完全可以仿照它们自定义自己的Channel。

    public interface MyOutputChannel {
    
        @Output("myOutput")
        MessageChannel myOutput();
    
    }
    
    public interface MyInputChannel {
    
        @Input("myInput")
        SubscribableChannel myInput();
    
    }
    

    然后我们新建一个生产者,使用自定义的output通道。

    @Slf4j
    @Component
    @EnableBinding(MyOutputChannel.class)
    public class MyMessageProducer2 {
    
        @Autowired
        private MyOutputChannel myOutputChannel;
    
        public void send(String message){
            log.info("MyMessageProducer2生产者开始发送消息!");
            myOutputChannel.myOutput().send(MessageBuilder.withPayload(message).build());
        }
    
    }
    

    新建一个消费者,监听自定义的input通道。

    @Slf4j
    @Component
    @EnableBinding(MyInputChannel.class)
    public class MyMessageConsumer2 {
    
        @StreamListener("myInput")
        public void receive(String message){
            log.info("myInput监听到消息内容为:{}", message);
        }
    
    }
    

    再新建一个controller来触发发送消息:

    @RestController
    public class ProducerController2 {
    
        @Autowired
        private MyMessageProducer2 producer;
    
        @GetMapping("/sendMessage2")
        public String sendMessage(){
            producer.send("hello spring cloud stream using customize channel!");
            return "OK";
        }
    
    }
    

    最后,配置文件中需要增加我们自定义的output和input通道:

    server:
      port: 8081
    
    spring:
      application:
        name: stream-demo
      cloud:
        stream:
          bindings:
            output:
              destination: myRocketMQTopic
              group: test-stream
            myOutput:
              destination: myRocketMQTopic2
              group: test-stream2
            input:
              destination: myRocketMQTopic
              group: test-stream
            myInput:
              destination: myRocketMQTopic2
              group: test-stream2
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
    

    如此,我们两个例子中的channel可以同时工作了。

    2021-07-16 10:16:46.495  INFO 18580 --- [nio-8081-exec-3] c.e.s.producer.MyMessageProducer2        : MyMessageProducer2生产者开始发送消息!
    2021-07-16 10:16:52.220  INFO 18580 --- [nio-8081-exec-2] c.e.s.producer.MyMessageProducer2        : MyMessageProducer2生产者开始发送消息!
    2021-07-16 10:16:52.808  INFO 18580 --- [nio-8081-exec-4] c.e.s.producer.MyMessageProducer2        : MyMessageProducer2生产者开始发送消息!
    2021-07-16 10:16:55.610  INFO 18580 --- [nio-8081-exec-5] c.e.s.producer.MyMessageProducer         : 生产者开始发送消息!
    2021-07-16 10:16:55.729  INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer         : 监听到消息内容为:hello spring cloud stream!
    2021-07-16 10:16:56.306  INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer2        : myInput监听到消息内容为:hello spring cloud stream using customize channel!
    2021-07-16 10:16:56.306  INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer2        : myInput监听到消息内容为:hello spring cloud stream using customize channel!
    2021-07-16 10:16:58.249  INFO 18580 --- [nio-8081-exec-6] c.e.s.producer.MyMessageProducer         : 生产者开始发送消息!
    2021-07-16 10:16:58.256  INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer         : 监听到消息内容为:hello spring cloud stream!
    2021-07-16 10:16:59.310  INFO 18580 --- [MessageThread_1] c.e.s.consumer.MyMessageConsumer2        : myInput监听到消息内容为:hello spring cloud stream using customize channel!
    

    四、消息过滤

    有两种消息过滤的方法,第一种是设置Cosumer的消费Tag,指定的Tag才会被消费,未指定的Tag的消息则被过滤;另外一种是根据自定义header内容进行过滤。

    为了区别以上的例子,我们重新创建一个Channel:

    public interface NewChannel {
    
        String INPUT = "newInput";
        String OUTPUT = "newOutput";
    
        @Output(NewChannel.INPUT)
        MessageChannel output();
    
        @Input(NewChannel.INPUT)
        SubscribableChannel input();
    }
    

    然后分别是生产者和消费者:

    @Slf4j
    @Component
    @EnableBinding(NewChannel.class)
    public class MyMessageProducer3 {
    
        @Autowired
        private NewChannel newChannel;
    
        public void send(Person person){
            log.info("MyMessageProducer3生产者开始发送消息!");
            Message<Person> message1 = MessageBuilder.withPayload(person)
                    // 指定ROcketMQ中消息的TAG
                    .setHeader(RocketMQHeaders.TAGS, "animal")
                    // 自定义header用于consumer监听器进行过滤
                    .setHeader("MyTag", "TAG1")
                    .build();
    
            Message<Person> message2 = MessageBuilder.withPayload(person)
                    .setHeader(RocketMQHeaders.TAGS, "person")
                    .setHeader("MyTag", "TAG2")
                    .build();
    
            Message<Person> message3 = MessageBuilder.withPayload(person)
                    .setHeader(RocketMQHeaders.TAGS, "person")
                    .setHeader("MyTag", "TAG3")
                    .build();
            newChannel.output().send(message1);
            newChannel.output().send(message2);
            newChannel.output().send(message3);
        }
    
    }
    
    @Slf4j
    @Component
    @EnableBinding(NewChannel.class)
    public class MyMessageConsumer3 {
    
        @StreamListener(value = NewChannel.INPUT, condition = "headers['MyTag']=='TAG2'")
        public void receive(Message<Person> message){
            log.info("MyMessageConsumer3监听到消息内容为:{}", message);
            log.info("MyMessageConsumer3消息的payload:{}", message.getPayload());
            log.info("MyMessageConsumer3消息的header:{}", message.getHeaders().get("MyTag"));
        }
    
    }
    

    消息的实体类为:

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class Person {
    
        private String name;
        private Integer age;
    }
    

    新建一个Controller用于发送消息:

    @RestController
    public class ProducerController3 {
    
        @Autowired
        private MyMessageProducer3 producer;
    
        @GetMapping("/sendMessage3")
        public String sendMessage(){
            Person tom = new Person("tom", 21);
            producer.send(tom);
            return "OK";
        }
    
    }
    

    配置文件修改为:

    server:
      port: 8081
    
    spring:
      application:
        name: stream-demo
      cloud:
        stream:
          bindings:
            newInput:
              destination: newRocketMQTopic
              group: test-stream3
            newOutput:
              destination: newRocketMQTopic
              group: test-stream3
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
              enable-msg-trace: true
            bindings:
              newInput:
                consumer:
                  # 仅接受指定tags的消息
                  tags: person || plant
    

    此时重启应用,我们访问controller的接口,就能同时发送三条消息出去。

    在配置文件中,我们指定消费者只消费tags为person和plant的消息,对于tags为animal的消息则会显示CONSUMED_BUT_FILTERED

    在消费者消费配置代码中,我们的注解@StreamListener指定了condition为只消费headers['MyTag']=='TAG2'的消息;而TAG3的消息也被CONSUMED,只不过没有找到对应的消费者,所以控制台显示:

    Cannot find a @StreamListener matching for message with id: 48260ac4-772b-54b8-2ae0-8b91635729a9
    

    五、函数式编程

    自从SpringCloud升级到3.1版本以上,如上示例中命令式编程注解@EnableBinding、@Output、@Input等都已经被标记为废弃了,废弃说明如下:

    @deprecated as of 3.1 in favor of functional programming model
    

    那么我们该如何使用函数式编程改造如上的示例呢。

    5.1 定时驱动模型

    我们需要重新创建一个工程,父工程stream-function,并且引入如下的依赖:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/com.alibaba.cloud/spring-cloud-starter-stream-rocketmq -->
    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
        <version>2021.1</version>
    </dependency>
    
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>
    

    然后创建一个子工程consumer,新建一个Producer类:

    @Slf4j
    @Component
    public class FunctionConsumer {
    
        @Bean
        public Consumer<Date> mySink(){
            return (message) -> {
                log.info("收到的消息为:{}", message);
            };
        }
    
    }
    

    增加配置文件:

    server:
      port: 8082
    
    spring:
      application:
        name: stream-demo-consumer
      cloud:
        stream:
          bindings:
            # 格式:方法名-类型-序号
            # 方法名表示消费消息的方法的名称
            # 类型in表示消息接收,out表示消息发送
            # 序号用来区分不同的消费者
            mySink-in-0:
              # topic
              destination: functionRocketMQTopic
              group: test-stream4
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
              enable-msg-trace: true
        function:
          # 消费者处理消息的方法名称
          definition: mySink
    

    至此,消费者就完成了,可以启动运行,就会开始监听functionRocketMQTopic这个主题的消息了。

    然后我们还需要再创建一个子工程producer,新建一个Producer类:

    @Slf4j
    @Component
    public class FunctionProducer {
    
        @Bean
        public Supplier<Date> mySource(){
            return () -> {
                log.info("当前发送消息开始!");
                return new Date();
            };
        }
    
    }
    

    然后增加配置文件如下:

    server:
      port: 8081
    
    spring:
      application:
        name: stream-demo-producer
      cloud:
        stream:
          bindings:
            # 格式:方法名-类型-序号
            # 方法名表示消费消息的方法的名称
            # 类型in表示消息接收,out表示消息发送
            # 序号用来区分不同的消费者
            mySource-out-0:
              # topic
              destination: functionRocketMQTopic
              group: test-stream4
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
              enable-msg-trace: true
        function:
          # 生产者产生消息的方法名称
          definition: mySource
    

    如此,我们生产者也配置好了,启动就可以运行了。

    5.2 streamBridge模型

    该模型主要是为了满足由业务动作触发消息发送的场景。我们只需要新建一个Producer类:

    @Slf4j
    @Service
    public class StreamBridgeProducer {
    
        @Autowired
        private StreamBridge streamBridge;
    
        public void sendDateMessage(){
            Message<Date> message = MessageBuilder.withPayload(new Date()).build();
            log.info("开始发送消息!");
            streamBridge.send("mySource-out-1", message);
        }
    
    }
    

    然后新建一个Controller供调用触发消息的发送:

    @RestController
    public class SendController {
    
        @Autowired
        private StreamBridgeProducer streamBridgeProducer;
    
        @GetMapping("/sendMessage")
        public String sendMessage(){
            streamBridgeProducer.sendDateMessage();
            return "OK";
        }
    
    }
    

    最后,配置文件改为如下:

    server:
      port: 8081
    
    spring:
      application:
        name: stream-demo-producer
      cloud:
        stream:
          bindings:
            mySource-out-1:
              destination: functionRocketMQTopic
              group: test-stream4
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
              enable-msg-trace: true
    

    如此,我们启动该生产者,在调用接口的时候,消息就能成功发送了。

    5.3 多通道配置

    我们可以在一个项目中同时配置多个生产者和多个消费者,甚至是既有生产者,也有消费者。

    这里只给出配置文件了,其余内容可以参考上述已经给出的例子。

    server:
      port: 8081
    
    spring:
      application:
        name: stream-demo-producer
      cloud:
        stream:
          bindings:
            # 多个生产者通道时,定时消息模式destination可以相同,streamBridge模式destination不能相同
            mySource-out-0:
              destination: functionRocketMQTopic0
            mySource-out-1:
              destination: functionRocketMQTopic1
            mySource-out-2:
              destination: functionRocketMQTopic2
            mySource2-out-3:
              destination: functionRocketMQTopic3
            # 消费者通道  
            mySink-in-0:
              destination: functionRocketMQTopic0
              group: test-stream4
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
              enable-msg-trace: true
        function:
          definition: mySource;mySource2;mySink
    

    六、消息分区

    SpringCloudStream消息分区是在某些场景下确保具有相同特征的消息被同一个消费者消费,它能帮助某些不支持消息分区的消息队列实现消息分区的功能。

    生产者的配置文件改为:

    server:
      port: 8084
    
    spring:
      application:
        name: stream-demo-producer
      cloud:
        stream:
          bindings:
            mySource-out-0:
              destination: functionRocketMQTopic
              producer:
                # 指定分区键的表达式规则
                partition-key-expresion: payload
                # 指定分区数量
                partition-count: 2
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
              enable-msg-trace: true
        function:
          definition: mySource
    

    多个消费者的配置文件改为:

    server:
      port: 8082
    
    spring:
      application:
        name: stream-demo-consumer2
      cloud:
        stream:
          # 指定当前消费者分区数量
          instance-count: 2
          # 指定当前消费者的索引
          instance-index: 0
          bindings:
            mySink-in-0:
              destination: functionRocketMQTopic
              group: test-stream4
              consumer:
                # 开启分区
                partitioned: true
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
              enable-msg-trace: true
        function:
          definition: mySink
    
    server:
      port: 8083
    
    spring:
      application:
        name: stream-demo-consumer2
      cloud:
        stream:
          # 指定当前消费者分区数量
          instance-count: 2
          # 指定当前消费者的索引
          instance-index: 1
          bindings:
            mySink-in-0:
              destination: functionRocketMQTopic
              group: test-stream4
              consumer:
                # 开启分区
                partitioned: true
          rocketmq:
            binder:
              name-server: 127.0.0.1:9876
              enable-msg-trace: true
        function:
          definition: mySink
    

    待实验验证。

    七、多消息队列

    我们可以使用Stream配置多个不同类型的MQ,具体实例待补充

    SpringCloudStream自学文档 - 知乎 (zhihu.com)

    八、其它特性

    关于Stream的用法其实已经讲的差不多了,剩下的主要就是RocketMQ Binder的一些配置属性,比如:

    • 消费者
      • tags消息过滤
      • 广播模式的开启
      • 顺序消费
    • 生产者
      • 是否开启事务
      • 是否同步发送
      • 发送超时设置、消息最大值设置
      • 重试次数

    等等,其它特性都可以参考github上rocketmq的文档,本文就不再一一列举了。

    RocketMQ en · alibaba/spring-cloud-alibaba Wiki · GitHub

    九、补充

    有的时候,发送MQ消息显示消息没有成功发送,出现了很奇怪的问题,网上找的方案是,需要关闭vip通道。

    spring:
     cloud:
       stream:
          rocketmq:
              default:
                producer:
                  # 关闭vip channel,否则消息会发送失败
                  vipChannelEnabled: false
    
    

    尝试之后,确实有效。

    相关文章

      网友评论

          本文标题:Stream使用入门

          本文链接:https://www.haomeiwen.com/subject/lbfjmltx.html