美文网首页
Spring Cloud Stream

Spring Cloud Stream

作者: quanCN | 来源:发表于2020-11-06 11:38 被阅读0次

简介

Spring Cloud可以轻松地将消息队列集成到基于Spring的微服务中,它是通过Spring Cloud Stream是一个有注解驱动的框架,它允许开发人员在Spring 中轻松地构建消息发布者和消费者。
Spring Cloud Stream可以使用多个MQ平台(包括Apache Kafka和Rabbit MQ),而平台的具体实现细节则被排除在应用程序代码之外,在应用程序中实现消息发布和消费是通过平台无关的Spring接口实现的。

Spring Cloud Stream 架构

如图Spring Cloud Stream有四个组件涉及发布消息和消费消息


  • 发射器(Source)
    当一个服务准备发布消息时,它将使用一个发射器发布消息。发射器是一个Spring注解接口,它接收一个普通的Java对象(POJO),该对象代表要发布的消息。发射器接收消息,然后序列化它(默认是JSON)并将消息发布到通道
  • 通道(Channel)
    通道是对队列的一个抽象,它将在消息生产者发布消息或消息消费者消费消息后保留该消息。通道名称始终与目标队列名相关联。然而,队列名称永远不会直接公开给代码,相反,通道名称会在代码中使用。这意味着开发人员可以通过更改应用程序的配置而不是应用程序的代码来切换通道读取或写入的队列
  • 绑定器(Binder)
    绑定器是框架的一部分,它是与特定MQ平台对话的Spring代码。绑定器部分允许开发人员处理消息,而不必依赖于特定与平台的库和API来发布和消费消息
  • 接收器(Sink)
    在Spring Cloud Stream中,服务通过一个接收器从队列中接收消息。接收器监听传入消息的通道,并将消息反序列化为POJO。

简单的MQ生产者和消费者

注:以RabbitMQ为例

依赖

可以使用RabbitMQ binder添加到Spring Cloud Stream项目中

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

也可以直接使用Spring Cloud Stream RabbitMQ Starter启动器

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

生产者

  • 启动类
    在启动类添加@EnableBinding

    package cn.lilq.cloudnetflix.cloudshopserver;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    
    
    @SpringBootApplication
    @EnableBinding(Source.class)
    public class CloudShopServerApplication {
        public static void main(String[] args) {
            SpringApplication.run(CloudShopServerApplication.class,args);
        }
    }
    
  • 自定义发射器

    package cn.lilq.cloudnetflix.cloudshopserver.events;
    
    //省略部分import
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.messaging.support.MessageBuilder;
    
    @Component
    public class MySourceBean {
        private Source source;
    
        private static final Logger logger = LoggerFactory.getLogger(MySourceBean.class);
    
        @Autowired
        public MySourceBean(Source source){
            this.source = source;
        }
    
        public void publishOrderChange(Order order){
            logger.debug("Sending RabbitMQ message {} for User Id , Goods Id: {}",order.getUserId(),order.getGoodsId());
            source.output().send(MessageBuilder.withPayload(order).build());
        }
    }
    
  • 配置

    #cloud-stream
    spring.cloud.stream.bindings.output.destination=shopTopic
    # RabbitMQ
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    

    注:output为通道名称,通过查看Source可以看到

    String OUTPUT = "output";
    @Output(Source.OUTPUT)
    MessageChannel output();
    

    destination表示要写入消息的消息队列(或者主题)名称

  • 发布消息

    @Service("shopService")
    public class ShopServiceImpl implements ShopService {
        @Autowired 
        private MySourceBean mySourceBean;
    
        @Override
        public void sendShop(Order order) {
            mySourceBean.publishOrderChange(order);
        }
    }
    

消费者

  • 启动类

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Sink;
    
    
    @EnableBinding(Sink.class)
    @SpringBootApplication
    public class CloudTestApplication {
        public static void main(String[] args) {
            SpringApplication.run(CloudTestApplication.class,args);
        }
    }
    
  • 自定义接收器

    //省略部分import
    import org.springframework.cloud.stream.annotation.StreamListener;
    import org.springframework.cloud.stream.messaging.Sink;
    import org.springframework.stereotype.Component;
    
    
    @Component
    public class ShopListener {
        private static final Logger logger = LoggerFactory.getLogger(ShopListener.class);
    
        @StreamListener(Sink.INPUT)
        public void receive(Order order){
            logger.debug("order{}"+"userID"+order.getUserId()+"Goods ID"+order.getGoodsId());
        }
    }
    
  • 配置

    #cloud-stream
    spring.cloud.stream.bindings.input.destination=shopTopic
    spring.cloud.stream.bindings.input.group=test2Group
    # RabbitMQ
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    

    注:group属性定义将要消费消息的消费者组的名称

  • 消费者组概念
    开发人员可能拥有多个服务,每个服务都有多个实例侦听同一个消息队列,但是只需要服务实例组中的一个服务实例来消费和处理消息。
    group属性标识服务所属的消费者组。只要服务实例具有相同的组名,Spring Cloud Stream和底层消息代理将保证,只有消息的一个副本会被属于该组的服务实例所使用。如图

相关文章

网友评论

      本文标题:Spring Cloud Stream

      本文链接:https://www.haomeiwen.com/subject/axhtbktx.html