简介
Spring Cloud
可以轻松地将消息队列集成到基于Spring
的微服务中,它是通过Spring Cloud Stream
是一个有注解驱动的框架,它允许开发人员在Spring 中轻松地构建消息发布者和消费者。
Spring Cloud Stream
可以使用多个MQ平台(包括Apache Kafka和Rabbit MQ),而平台的具体实现细节则被排除在应用程序代码之外,在应用程序中实现消息发布和消费是通过平台无关的Spring接口实现的。
Spring Cloud Stream 架构
如图Spring Cloud Stream有四个组件涉及发布消息和消费消息
data:image/s3,"s3://crabby-images/12c41/12c41e8e4f150919e4e7f8f526f93eb32016a0e4" alt=""
- 发射器(Source)
当一个服务准备发布消息时,它将使用一个发射器发布消息。发射器是一个Spring注解接口,它接收一个普通的Java对象(POJO),该对象代表要发布的消息。发射器接收消息,然后序列化它(默认是JSON)并将消息发布到通道 - 通道(Channel)
通道是对队列的一个抽象,它将在消息生产者发布消息或消息消费者消费消息后保留该消息。通道名称始终与目标队列名相关联。然而,队列名称永远不会直接公开给代码,相反,通道名称会在代码中使用。这意味着开发人员可以通过更改应用程序的配置而不是应用程序的代码来切换通道读取或写入的队列 - 绑定器(Binder)
绑定器是框架的一部分,它是与特定MQ平台对话的Spring代码。绑定器部分允许开发人员处理消息,而不必依赖于特定与平台的库和API来发布和消费消息 - 接收器(Sink)
在Spring Cloud Stream中,服务通过一个接收器从队列中接收消息。接收器监听传入消息的通道,并将消息反序列化为POJO。
简单的MQ生产者和消费者
注:
以RabbitMQ为例
依赖
可以使用RabbitMQ binder
添加到Spring Cloud Stream
项目中
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
也可以直接使用Spring Cloud Stream RabbitMQ Starter
启动器
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
生产者
-
启动类
在启动类添加@EnableBinding
package cn.lilq.cloudnetflix.cloudshopserver; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; @SpringBootApplication @EnableBinding(Source.class) public class CloudShopServerApplication { public static void main(String[] args) { SpringApplication.run(CloudShopServerApplication.class,args); } }
-
自定义发射器
package cn.lilq.cloudnetflix.cloudshopserver.events; //省略部分import import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; @Component public class MySourceBean { private Source source; private static final Logger logger = LoggerFactory.getLogger(MySourceBean.class); @Autowired public MySourceBean(Source source){ this.source = source; } public void publishOrderChange(Order order){ logger.debug("Sending RabbitMQ message {} for User Id , Goods Id: {}",order.getUserId(),order.getGoodsId()); source.output().send(MessageBuilder.withPayload(order).build()); } }
-
配置
#cloud-stream spring.cloud.stream.bindings.output.destination=shopTopic # RabbitMQ spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
注:
output为通道名称,通过查看Source
可以看到String OUTPUT = "output"; @Output(Source.OUTPUT) MessageChannel output();
destination
表示要写入消息的消息队列(或者主题)名称 -
发布消息
@Service("shopService") public class ShopServiceImpl implements ShopService { @Autowired private MySourceBean mySourceBean; @Override public void sendShop(Order order) { mySourceBean.publishOrderChange(order); } }
消费者
-
启动类
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Sink; @EnableBinding(Sink.class) @SpringBootApplication public class CloudTestApplication { public static void main(String[] args) { SpringApplication.run(CloudTestApplication.class,args); } }
-
自定义接收器
//省略部分import import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Component; @Component public class ShopListener { private static final Logger logger = LoggerFactory.getLogger(ShopListener.class); @StreamListener(Sink.INPUT) public void receive(Order order){ logger.debug("order{}"+"userID"+order.getUserId()+"Goods ID"+order.getGoodsId()); } }
-
配置
#cloud-stream spring.cloud.stream.bindings.input.destination=shopTopic spring.cloud.stream.bindings.input.group=test2Group # RabbitMQ spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
注:group
属性定义将要消费消息的消费者组的名称 -
消费者组概念
开发人员可能拥有多个服务,每个服务都有多个实例侦听同一个消息队列,但是只需要服务实例组中的一个服务实例来消费和处理消息。
group
属性标识服务所属的消费者组。只要服务实例具有相同的组名,Spring Cloud Stream
和底层消息代理将保证,只有消息的一个副本会被属于该组的服务实例所使用。如图
网友评论