美文网首页
Spring-Cloud RabbitMQ 用法 - 发送jso

Spring-Cloud RabbitMQ 用法 - 发送jso

作者: heichong | 来源:发表于2020-03-13 15:40 被阅读0次

    本篇主要介绍RabbitTemplate发送json对象的用法


    1. 环境

    spring-boot 2.1.1.RELEASE
    spring-cloud Finchley.RELEASE
    rabbitmq 3.7.10
    spring-boot-starter-amqp

    2. RabbitMQ的content_type

    针对RabbitMQ的消息,最终存储的都是字节数组,而我们在消费端,消息可能是字符串、对象等其它类型。我们发送时需要指定每个消息的content_type属性,从而让消费端知道如何把字节数组转化成消息的原始类型。
    例如,SimpleMessageConverter是默认的消息转换器,以其作为例子:

    • content_type=text/plain的消息,直接new String,关键代码为:
    if (contentType != null && contentType.startsWith("text")) {
        String encoding = properties.getContentEncoding();
        if (encoding == null) {
            encoding = this.defaultCharset;
        }
        try {
            content = new String(message.getBody(), encoding);
        }
        catch (UnsupportedEncodingException e) {
            throw new MessageConversionException(
                    "failed to convert text-based Message content", e);
        }
    }
    
    • content_type=application/x-java-serialized-object,直接进行java反序列化,关键代码为:
    else if (contentType != null &&
            contentType.equals(MessageProperties.CONTENT_TYPE_SERIALIZED_OBJECT)) {
        try {
            content = SerializationUtils.deserialize(
                    createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
        }
        catch (IOException | IllegalArgumentException | IllegalStateException e) {
            throw new MessageConversionException(
                    "failed to convert serialized Message content", e);
        }
    }
    

    我们从RabbitMQ的web页面可以看到每个消息的content_type

    image.png

    正因为content_type的存在,RabbitMQ才能支持消息的多样性。
    在发送数据,有两种方式可以指定content_type

    1. 自己构建消息对象Message,通过MessageProperties设置content_type
    2. 通过MessageConverter来设置content_type

    我们下面的例子都以Department作为json对象,请注意json对象必须包含无参构造器

    public class Department {
        private Integer departmentId ;
        private String departmentName ;
        private String departmentDesc ;
    
        /**
         * 对于json对象,必须要有无参的构造器
         */
        public Department() {
        }
    
        public Department(Integer departmentId, String departmentName, String departmentDesc) {
            this.departmentId = departmentId;
            this.departmentName = departmentName;
            this.departmentDesc = departmentDesc;
        }
    
        @Override
        public String toString() {
            return "Department{" +
                    "departmentId=" + departmentId +
                    ", departmentName='" + departmentName + '\'' +
                    ", departmentDesc='" + departmentDesc + '\'' +
                    '}';
        }
    
        //省略getter and setter
    }
    

    3. 发送Json方式1:自定义Message

    这种方式需要自己把对象转化为json字符串,创建消息的MessageProperties,设置content_type,最后构建Message对象,并发送。
    核心代码如下:

    @Component
    public class JsonSender {
        Logger logger = LoggerFactory.getLogger(getClass());
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 发送对象消息,使用的json序列化
         *  Department不需要实现序列号接口,也不需要生产者、消费者是同一个包下的同一个类。只需要属性相同即可。
         */
        public void sendTest1(Department department)  {
            logger.info("---> 我准备发送json对象:"+department);
    
            /**
             * 发送json对象,必须为消息指定contentType,否则发送时只会使用默认的消息转换器SimpleMessageConverter
             * 而SimpleMessageConverter只支持字符串、byte数字和java Serializable对象
             * 不指定就会在发送时报错:IllegalArgumentException:SimpleMessageConverter only supports String, byte[] and Serializable payloads
             */
    
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                //对象转化为json字符串
                String jsonStr = objectMapper.writeValueAsString(department) ;
                //配置消息的属性contentType
                MessageProperties properties = MessagePropertiesBuilder.newInstance()
                        .setContentType(MessageProperties.CONTENT_TYPE_JSON)
                        .build();
                //构建消息对象
                Message message = MessageBuilder.withBody(jsonStr.getBytes())
                        .andProperties(properties)
                        .build();
                //发送
                this.rabbitTemplate.convertAndSend(RabbitmqBaseConfig.EX_SIMPLE_DIRECT, RabbitmqJsonConfig.KEY_SIMPLE_JSON, message); ;
    
            } catch (JsonProcessingException e) {
                e.printStackTrace();
            }
        }
    }
    
    

    通过RabbitMQ可以查看消息,其content_type=application/json

    RabbitMQ Message

    这种方式的优点是简单明了,可以与发送字符串、java等共用同一个RabbitTemplate;缺点则是需要自己把对象转化成字符串、构建消息属性和消息对象,代码比较多。

    4. 发送Json方式2:设置MessageConverter

    RabbitTemplate在发送消息对象时,会通过MessageConverter把对象转化为byte[],我们以其中一个convertAndSend方法作为入口,其源码为:

        @Override
        public void convertAndSend(String exchange, String routingKey, final Object object) throws AmqpException {
            convertAndSend(exchange, routingKey, object, (CorrelationData) null);
        }
    

    继续跟进

        @Override
        public void convertAndSend(String exchange, String routingKey, final Object object,
                @Nullable CorrelationData correlationData) throws AmqpException {
    
            send(exchange, routingKey, convertMessageIfNecessary(object), correlationData);
        }
    

    继续跟进convertMessageIfNecessary方法:

        protected Message convertMessageIfNecessary(final Object object) {
            if (object instanceof Message) {
                return (Message) object;
            }
            return getRequiredMessageConverter().toMessage(object, new MessageProperties());
        }
    

    getRequiredMessageConverter方法,则是获取RabbitTemplate内部的MessageConverter:

        private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
            MessageConverter converter = getMessageConverter();
            if (converter == null) {
                throw new AmqpIllegalStateException(
                        "No 'messageConverter' specified. Check configuration of RabbitTemplate.");
            }
            return converter;
        }
    
        //省略其它代码....
    
        /**
         * Return the message converter for this template. Useful for clients that want to take advantage of the converter
         * in {@link ChannelCallback} implementations.
         *
         * @return The message converter.
         */
        public MessageConverter getMessageConverter() {
            return this.messageConverter;
        }
    
    

    RabbitTemplate默认的MessageConverter为SimpleMessageConverter,它不支持json,所以我们需要替换为支持json序列化的消息转换器,而Spring也给我们提供了一个Jackson2JsonMessageConverter

    • Jackson2JsonMessageConverter引来与jackson的jar包
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.9.8</version>
    </dependency>
    
    • 配置用于发送json对象的RabbitTemplate
        /**
         * 定义支持json的RabbitTemplate
         * @param connectionFactory
         * @return
         */
        @Bean
        public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            //重写发送端的消息转换器,来替换默认的SimpleMessageConverter
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            return template;
        }
    
    • 发送json对象示例代码如下:
    
        /**
         * 发送对象消息,使用的json序列化
         */
        public void sendTest2(Department department)  {
            logger.info("---> 我准备发送json对象:"+department);
    
            /**
             * 发送json对象,必须为消息指定contentType,否则发送时只会使用默认的消息转换器SimpleMessageConverter
             * 而SimpleMessageConverter只支持字符串、byte数字和java Serializable对象
             * 不指定就会在发送时报错:IllegalArgumentException:SimpleMessageConverter only supports String, byte[] and Serializable payloads
             */
            //发送
            this.jsonRabbitTemplate.convertAndSend(RabbitmqBaseConfig.EX_SIMPLE_DIRECT, RabbitmqJsonConfig.KEY_SIMPLE_JSON, department); ;
    
        }
    

    通过RabbitMQ可以查看消息:

    RabbitMQ Json Message

    写在最后:

    细心的同学可能已经发现,这两种方式发送json的方式,在RabbitMQ内还是有些不同(注意对比两张图片),通过Jackson2JsonMessageConverter发送的消息多了一个__TypeId__header属性,其值为json对象的全限定类名。__TypeId__的作用主要为消费者进行消息转化时提供依据,下一节我们会继续讲它的作用。

    相关文章

      网友评论

          本文标题:Spring-Cloud RabbitMQ 用法 - 发送jso

          本文链接:https://www.haomeiwen.com/subject/ckhxjqtx.html