美文网首页
Spring-Cloud RabbitMQ 用法 - 监听jso

Spring-Cloud RabbitMQ 用法 - 监听jso

作者: heichong | 来源:发表于2020-03-13 15:40 被阅读0次

    本篇主要介绍基于spring的@RabbitListener监听json对象的用法


    1. 环境

    spring-boot 2.1.1.RELEASE
    spring-cloud Finchley.RELEASE
    rabbitmq 3.7.10
    spring-boot-starter-amqp

    2. Json在RabbitMQ中的格式

    在上一节中,我们使用两种方式来发送json对象,它们在RabbitMQ中的格式为:

    • 发送Json方式1:自定义Message,消息格式为:


      格式1
    • 发送Json方式2:设置MessageConverter,消息格式为:


      格式2
    • 可以看出格式2格式1多了一个header属性:__TypeId__,其值为json对象的全限定类名。
    • 我们通过方式1发送时,由于消息是我们自定义的,所以RabbitTemplate就没有给我们增加此属性(当然我们也可以通过Message.setHeader()方式来手动设置);而通过方式2发送时,Jackson2JsonMessageConverter为我们设置了消息的__TypeId__属性
    • 这两种格式的消息监听还是有差别的,下面我们进行详细说明

    2. 消息监听容器 MessageListenerContainer

    要使用监听器,必须要先了解MessageListenerContainer,它通过MessageConverter把RabbitMQ中的byte[]转化为其他对象。

    • 默认情况:消息监听容器是SimpleRabbitListenerContainerFactory,其内部的默认消息转换器是SimpleMessageConverter,只支持content_typeapplication/octet-streamtext/plaincontent_type的消息

    • 如果想要处理content_type=application/json的消息,必须要使用Jackson2JsonMessageConverter消息转换器替换默认的消息转换器,代码如下:

        @Bean
        public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
                SimpleRabbitListenerContainerFactoryConfigurer configurer,
                ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            configurer.configure(factory, connectionFactory);
            //设置json序列化 消息转换器
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            return factory;
        }
    
    • ListenerContainerFactory只支持一个消息转换器,如果我们替换为Jackson2JsonMessageConverter,就无法支持其他类型的消息(如自定义的消息类型)。如果想要同时支持多种类型的消息,需要使用ContentTypeDelegatingMessageConverter,它可以代理多个消息转换器,通过content_type来决定使用哪个消息转换器。代码如下:
        /**
         * 消费者端配置: 重写默认的监听器工厂
         * 创建一个支持自定义json序列化监听容器工厂
         * @param connectionFactory
         * @return
         */
        @Bean
        public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(
                SimpleRabbitListenerContainerFactoryConfigurer configurer,
                ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            configurer.configure(factory, connectionFactory);
            /**
             * 系统使用的默认消息转换器为SimpleMessageConverter,可以处理文本、java序列化等
             * 我们要在默认的基础上,再添加一个处理json的消息转化器。
             * 所以这里我们使用了ContentTypeDelegatingMessageConverter,它可以代理多个消息处理器,每个消息处理器由contentType决定。
             * new ContentTypeDelegatingMessageConverter()就是指定了SimpleMessageConverter作为默认的消息处理器
             */
            ContentTypeDelegatingMessageConverter messageConverter = new ContentTypeDelegatingMessageConverter() ;
            messageConverter.addDelegate(MessageProperties.CONTENT_TYPE_JSON,new Jackson2JsonMessageConverter());
    
            //设置消息转换器
            factory.setMessageConverter(messageConverter);
            return factory;
        }
    

    按以上配置以后,就可以使用监听器来监听json类型的数据

    3. 监听器

    spring中使用@RabbitListener来实现消费者监听器,它用法有两种:

    • @RabbitListener标注方法
    • @RabbitListener标注类,@RabbitHandler标注方法

    @RabbitListener 标注方法

    使用方式:

    @Component
    public class QueueJsonListenerByRabbitListener {
        Logger logger = LoggerFactory.getLogger(getClass());
        /**
         * 使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息
         * @param headers
         */
        @RabbitListener(queues = RabbitmqJsonConfig.QUEUE_SIMPLE_JSON)
        public void process(@Payload Department department, @Headers Map<String,Object> headers) {
            logger.info("<--- json 我收到的消息:"+department+", header="+headers);
        }
    }
    
    • queues为要监听的队列名称
    • @Payload为消息对象本身
    • @Headers为消息对象的头部,也可以通过@Header获取单个头部属性:@Header String token
    • 这种方式不仅能监听上面格式1的json消息,还能监听格式2的json消息。
    • 这种方式之所有能监听格式1的json消息,是因为@RabbitListener注解到方法上,可以获取方法参数对象Department的类型。
    • 对于这两种格式的消息,在发送端,发送的都是Department对象;而在消费者端,我们可以把消息转化为任何对象(如Department2),只需要保证对象的属性与Department的属性一致即可。(这种方式对于发送端和消费端不在同一个系统的应用来说特别有用)

    @RabbitListener 标注类,@RabbitHandler 标注方法

    使用方式:

    @Component
    @RabbitListener(queues = RabbitmqJsonConfig.QUEUE_SIMPLE_JSON)
    public class QueueJsonListenerByRabbitHandler {
        Logger logger = LoggerFactory.getLogger(getClass());
        /**
         * 使用 @Payload 和 @Headers 注解可以获取消息中的 body 与 headers 信息
         * @param headers
         */
        @RabbitHandler
        public void process(@Payload Department department, @Headers Map<String,Object> headers) {
            logger.info("<--- json 我收到的消息:"+department+", header="+headers);
        }
    }
    
    • 这种方式只能监听含有__TypeId__头属性的json消息,即格式2这种类型的json消息。
    • 这种方式@RabbitListener无法获取到json对象Department的类型,所以如果没有__TypeId__明确指定json对象类型的话,系统就无法转化格式1这种类型的json消息。

    参考

    相关文章

      网友评论

          本文标题:Spring-Cloud RabbitMQ 用法 - 监听jso

          本文链接:https://www.haomeiwen.com/subject/swzxjqtx.html