本篇主要介绍基于spring的@RabbitListener
监听json对象的用法
1. 环境
spring-boot
2.1.1.RELEASE
spring-cloudFinchley.RELEASE
rabbitmq3.7.10
spring-boot-starter-amqp
2. Json在RabbitMQ中的格式
在上一节中,我们使用两种方式来发送json对象,它们在RabbitMQ中的格式为:
-
发送Json方式1:自定义Message,消息格式为:
格式1 -
发送Json方式2:设置MessageConverter,消息格式为:
格式2
- 可以看出
格式2
比格式1
多了一个header属性:__TypeId__
,其值为json对象的全限定类名。- 我们通过
方式1
发送时,由于消息是我们自定义的,所以RabbitTemplate就没有给我们增加此属性(当然我们也可以通过Message.setHeader()
方式来手动设置);而通过方式2
发送时,Jackson2JsonMessageConverter
为我们设置了消息的__TypeId__
属性- 这两种格式的消息监听还是有差别的,下面我们进行详细说明
2. 消息监听容器 MessageListenerContainer
要使用监听器,必须要先了解MessageListenerContainer
,它通过MessageConverter
把RabbitMQ中的byte[]
转化为其他对象。
-
默认情况:消息监听容器是
SimpleRabbitListenerContainerFactory
,其内部的默认消息转换器是SimpleMessageConverter
,只支持content_type
为application/octet-stream
、text/plain
、content_type
的消息 -
如果想要处理
content_type=application/json
的消息,必须要使用Jackson2JsonMessageConverter
消息转换器替换默认的消息转换器,代码如下:
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
//设置json序列化 消息转换器
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
- ListenerContainerFactory只支持一个消息转换器,如果我们替换为
Jackson2JsonMessageConverter
,就无法支持其他类型的消息(如自定义的消息类型)。如果想要同时支持多种类型的消息,需要使用ContentTypeDelegatingMessageConverter
,它可以代理多个消息转换器,通过content_type
来决定使用哪个消息转换器。代码如下:
/**
* 消费者端配置: 重写默认的监听器工厂
* 创建一个支持自定义json序列化监听容器工厂
* @param connectionFactory
* @return
*/
@Bean
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
/**
* 系统使用的默认消息转换器为SimpleMessageConverter,可以处理文本、java序列化等
* 我们要在默认的基础上,再添加一个处理json的消息转化器。
* 所以这里我们使用了ContentTypeDelegatingMessageConverter,它可以代理多个消息处理器,每个消息处理器由contentType决定。
* new ContentTypeDelegatingMessageConverter()就是指定了SimpleMessageConverter作为默认的消息处理器
*/
ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter() ;
messageConverter.addDelegate(MessageProperties.CONTENT_TYPE_JSON,new Jackson2JsonMessageConverter());
//设置消息转换器
factory.setMessageConverter(messageConverter);
return factory;
}
按以上配置以后,就可以使用监听器来监听json类型的数据
3. 监听器
spring中使用@RabbitListener
来实现消费者监听器,它用法有两种:
-
@RabbitListener
标注方法 -
@RabbitListener
标注类,@RabbitHandler
标注方法
@RabbitListener 标注方法
使用方式:
@Component
public class QueueJsonListenerByRabbitListener {
Logger logger = LoggerFactory.getLogger(getClass());
/**
* 使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息
* @param headers
*/
@RabbitListener(queues = RabbitmqJsonConfig.QUEUE_SIMPLE_JSON)
public void process(@Payload Department department, @Headers Map<String,Object> headers) {
logger.info("<--- json 我收到的消息:"+department+", header="+headers);
}
}
queues
为要监听的队列名称@Payload
为消息对象本身@Headers
为消息对象的头部,也可以通过@Header
获取单个头部属性:@Header String token
- 这种方式不仅能监听上面
格式1
的json消息,还能监听格式2
的json消息。 - 这种方式之所有能监听
格式1
的json消息,是因为@RabbitListener
注解到方法上,可以获取方法参数对象Department
的类型。 - 对于这两种格式的消息,在发送端,发送的都是
Department
对象;而在消费者端,我们可以把消息转化为任何对象(如Department2
),只需要保证对象的属性与Department
的属性一致即可。(这种方式对于发送端和消费端不在同一个系统的应用来说特别有用)
@RabbitListener 标注类,@RabbitHandler 标注方法
使用方式:
@Component
@RabbitListener(queues = RabbitmqJsonConfig.QUEUE_SIMPLE_JSON)
public class QueueJsonListenerByRabbitHandler {
Logger logger = LoggerFactory.getLogger(getClass());
/**
* 使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息
* @param headers
*/
@RabbitHandler
public void process(@Payload Department department, @Headers Map<String,Object> headers) {
logger.info("<--- json 我收到的消息:"+department+", header="+headers);
}
}
- 这种方式只能监听含有
__TypeId__
头属性的json消息,即格式2
这种类型的json消息。 - 这种方式
@RabbitListener
无法获取到json对象Department
的类型,所以如果没有__TypeId__
明确指定json对象类型的话,系统就无法转化格式1
这种类型的json消息。
参考
网友评论