1.RabbitMQ整合Spring AMQP
- RabbitAdmin
- SpringAMQP声明
- RabbitTemplate
SimpleMessageListenerContainer
简单消息监听容器MessageListenerAdapter
消息监听适配器MessageConverter
转换器(进行消息序列化,反序列化)
注意
-
autoStartup必须设置为true
,否则Spring容器不会加载RabbitAdmin类 - RabbitAdmin底层实现就是从Spring容器中获取Exchange,Bingding,RoutingKey以及Queue的@Bean声明
- 然后使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列的RabbitMQ基础功能操作。
- 例如: 添加一个交换机,删除一个绑定,清除一个队列中的消息等。
image.png
消息模板 RabbitTemplate
RabbitTemplate 即消息模板
-
我们在与springAMQP整合的时候进行发送消息的关键类。
-
该类提供了丰富的发送消息方法,包括可靠性投递方法,回调监听消息接口
ConfirmCallback
,返回值确认接口ReturnCallback
等等。同样我们需要进行注入到spring容器中。然后进行使用。 -
在与spring整合时需要实例化。但是在与springBoot整合时。在配置文件中添加配置即可
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { return new RabbitTemplate(connectionFactory); }
简单消息监听容器:SimpleMessageListenerContainer
- 这个类非常强大。 我们可以对他进行很多设置。对于消费者的配置项。这个类都可以满足
- 监听队列(多个队列) 、自动启动、自动声明功能
- 设置事务特性、事务管理器、事务属性、事务容量(并发)、 是否开启事务、回滚消息等
- 设置消费者数量、最大最小数量。批量消费
- 设置消息的签收模式、是否重回队列,异常捕获handler函数。
- 设置消费者标签生成策略,是否独占模式,消费者属性等
- 设置具体的鉴定器,消息转换器等等。
注意:
- SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者的大小,接收消息的模式等
- 很多基于RabbitMQ的定制化后端管控台在进行动态设置的时候,也是根据这一特性实现的。所以可以看出SpringAMQP非常的强大
SimpleMessageListenerContainer为什么可以动态感知配置变更?
配置代码
/**
* 简单消息监听容器
* 配置完成后。可以在管控台看到消息者信息。 以及消费者标签信息
*
* @param connectionFactory 链接工厂
* @return SimpleMessageListenerContainer
*/
@Bean
public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
//设置要监听的队列
simpleMessageListenerContainer.setQueues(queue001(), queue002(), queue003(), queue_image(), queue_pdf());
//初始化消费者数量
simpleMessageListenerContainer.setConcurrentConsumers(1);
//最大消费者数量
simpleMessageListenerContainer.setMaxConcurrentConsumers(5);
//设置是否重回队列[一般为false]
simpleMessageListenerContainer.setDefaultRequeueRejected(false);
//设置自动ack
simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.AUTO);
//设置channel 是否外露
simpleMessageListenerContainer.setExposeListenerChannel(true);
//设置消费端标签的策略
simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
@Override
public String createConsumerTag(String queueName) {
return queueName + "_" + UUID.randomUUID().toString();
}
});
//设置消息监听 ChannelAwareMessageListener
simpleMessageListenerContainer.setMessageListener(new ChannelAwareMessageListener() {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
System.out.println("----------消费者: " + msg);
}
});
return simpleMessageListenerContainer;
}
连接信息
消费者信息和相关配置
消息监听适配器:MessageListenerAdapter
/**
* 通过`simpleMessageListenerContainer` 配置消息监听适配器。 指向这个类
*
* @author yangHX
* createTime 2019/4/6 12:16
*/
public class MessageDelegate {
/**
* MessageListenerAdapter 默认指定接收消息的方法的名字就是 handleMessage .当然也可以手动设置
*
* @param messageBody message信息
*/
public void handleMessage(byte[] messageBody) {
System.err.println("默认方法,消息内容: " + new String(messageBody));
}
public void consumeMessage(byte[] messageBody) {
System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
}
public void consumeMessage(String messageBody) {
System.err.println("字符串方法, 消息内容:" + messageBody);
}
}
/**
* spring amqp 消息转换器
*
* @author yangHX
* createTime 2019/4/6 12:28
*/
public class TextMessageConverter implements MessageConverter {
/**
* 将数据转化为 message 类
*
* @param o 要发送的数据
* @param messageProperties 消息头
* @return Message
* @throws MessageConversionException ex
*/
@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
return new Message(o.toString().getBytes(), messageProperties);
}
/**
* 将message转换为想要的数据类型
*
* @param message message
* @return Object
* @throws MessageConversionException ex
*/
@Override
public Object fromMessage(Message message) throws MessageConversionException {
String contentType = message.getMessageProperties().getContentType();
if (null != contentType && contentType.contains("text")) {
return new String(message.getBody());
}
return message.getBody();
}
}
///消息监听适配器 只截取了一小段
/*
* 适配器方式。 默认是有自己的方法名字。 handleMessage
* 可以自己指定一个方法的名称。 consumerMessage
* 也可以添加一个转换器: 从字节数组转换为String
*/
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(new MessageDelegate());
messageListenerAdapter.setDefaultListenerMethod("consumeMessage");
messageListenerAdapter.setMessageConverter(new TextMessageConverter());
simpleMessageListenerContainer.setMessageListener(messageListenerAdapter);
return simpleMessageListenerContainer;
/**
* 发送消息。测试转换器和适配器
* <p>
* 转换器判断contentType 将字节数组转化为字符串
* 适配器将数据交给 MessageDelegate 的 consumeMessage 方法进行处理
*/
@Test
public void testMessage4Text() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType("text/plan");
Message message = new Message("mq 消息1234".getBytes(), messageProperties);
rabbitTemplate.send("topic001", "spring.abc", message);
rabbitTemplate.send("topic002", "rabbit.abc", message);
}
image.png
MessageListenerAdapter 消息监听适配器总结
- 通过messageListenerAdapter的代码我们可以看出如下核心属性
-
defaultListenerMethod
:默认监听方法名称,用于设置监听方法名称 -
Delegate
委托对象:实际真实的委托对象,用于处理消息 -
queueOrTagToMethodName
队列名称与方法名称组成的集合 - 可以一一进行队列和方法名称的匹配
- 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接受处理
MessageConverter
消息转换器
-
我们在消息传输的时候,正常情况下消息体为二进制的数据方式进行传输。如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter
-
自定义常用转换器
MessageConverter
一般来讲都需要实现这个接口 -
重写下面两个方法
-
toMessage
: java 对象转换为Message -
fromMessage
: Message对象转换为java对象
-
转换器类型
- json转换器: jackson2JsonMessageConverter: 可以进行java对象的转换功能
- DefaultJackson2JavaTypeMapper映射器: 可以进行java对象的映射关系
- 自定义二进制转换器: 比如图片类型、PDF,PPT, 流媒体
SpringBoot 整合RabbitMQ
- publisher-confirms, 实现了一个监听器,用于监听Borker端给我们返回的确认消息:
RabbitTemplate.ConfirmCallback
/**
* 回调函数 confirm确认模式
*/
final ConfirmCallback confirmCallback = new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.err.println("correlationData: " + correlationData);
System.err.println("ack: " + ack);
if (!ack) {
System.out.println("-----异常处理");
}
}
};
- publisher-returns, 保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续处理,保证消息的路由成功
RabbitTemplate.ReturnCallback
final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.err.println("return exchange : " + exchange + " , routingKey : " + routingKey + " , replyCode: " + replyCode + " , replyText: " + replyText);
}
};
-
注意一点 在发送消息的时候,对template进行配置
mandatory=true
保证监听有效- mandatory
当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。
- mandatory
-
生产端还可以配置其他属性,比如发送重试,超时时间,次数,间隔等。
SpringBoot 整合RabbitMQ 消费端
-
首先配置手动确认模式,用于ack的手工处理,这样我们可以保证消息的可靠性投递,或者在消费端消费失败的时候可以做到重回队列,根据业务记录日志等处理
-
可以设置消费端的监听个数和最大个数,用于控制消费端并发情况
-spring.rabbitmq.listener.simple.concurrency=5
-spring.rabbitmq.listener.simple.max-concurrency=10
-spring.rabbitmq.listener.simple.acknowledge-mode=manual 手工签收
@RabbitListener注解使用
- 消费端监听 @RabbitMQLIstener 注解。 这个注解在实际工作中非常好用
- @RabbitListener 是一个组合注解 , 里面可以进行注解配置
- @QueueBinding ,@Queue, @Exchange 直接通过这个组合注解一次性搞定消费端交换机,队列,绑定,路由,并且配置监听功能等。
SpringCloud Stream 整合
-
Spring Clould. 这个全家桶框架在整个中小型互联网公司异常的火爆,那么相对应着,Spring Cloud Stream 就渐渐的被大家所重视起来。
-
生产者和消费者可以是不同的消息中间件
-
Barista接口: Barista接口是用来定义后面类的参数。这一接口定义通道类型和通道名称,通道名称是作为配置用。通道类型则决定了app会使用这个通道进行发送消息,还是从中接收消息
-
@Output
: 输出注解 。用于定义发送消息接口 -
@Input
: 输入注解, 用于定义消息的消费信息接口 -
@StreamListener
: 用于定义监听方法的注解 -
使用spring Cloud Stream 非常简单。 只需要使用好这三个注解即可。在实现高性能消息的生产和消费的场景非常适合,但是使用 SpringCloudStream框架有一个非常大的问题,就是不能实现可靠性的投递,也就是没法保证消息的100%可靠性,会存在少量消息丢失的问题
-这个原因是因为SpringCloudStream框架为了和kafka兼顾所以在实际工作中使用它的目的就是针对高性能的消息通信的。 这点就是当前版本的Spring Cloud Stream 的定位
image.png
网友评论