美文网首页
消息确认与回调、消息序列化与Headers

消息确认与回调、消息序列化与Headers

作者: Doooook | 来源:发表于2020-11-10 08:42 被阅读0次

    一、消息确认与回调

    默认情况下,RabbitMQ发送消息以及接收消息是自动确认的,意思也就是说,消息发送方发送消息的时候,认为消息已经成功发送到了RabbitMQ服务器,而当消息发送给消费者后,RabbitMQ服务器就立即自动确认,然后将消息从队列中删除了。而这样的自动机制会造成消息的丢失,我们常常听到“丢消息”的字眼。

    为了解决消息的丢失,RabbitMQ便产生了手动确认的机制:

    • 发送者:
      • 当消息不能路由到任何队列时,会进行确认失败操作,如果发送方设置了mandatory=true模式,则先会调用basic.return方法,然后调用basic.ack方法;
      • 当消息可以路由时,消息被发送到所有绑定的队列时,进行消息的确认basic.ack
    • 接收者:
      • 当消息成功被消费时,可以进行消息的确认basic.ack
      • 当消息不能正常被消费时,可以进行消息的反确认basic.nack 或者拒绝basic.reject

    1.1 修改配置

    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: mitter
        password: mitter
        virtual-host: mitter_vhost # 注意:这里前面不能带/,默认的“/”理解成字符串就行,和Linux的目录斜杠还不是一回事
        publisher-confirms: true # 消息发送到交换机确认机制,是否确认回调
        publisher-returns: true # 消息发送到交换机确认机制,是否返回回调
        listener:
          simple:
            acknowledge-mode: manual # 采用手动应答
            concurrency: 1 # 指定最小的消费者数量
            max-concurrency: 100 # 指定最大的消费者数量
            retry:
              enabled: true # 是否支持重试
    

    1.2 配置交换机、队列

    @Configuration
    public class MeassageAckConfig {
    
        public static final String MESSAGE_ACK_EXCHANGE = "direct-message-ack-exchange";
        public static final String MESSAGE_ACK_QUEUE = "message-ack-queue";
        public static final String MESSAGE_ACK_ROUTE_KEY = "message.ack.key";
    
        @Bean
        public Queue messageAckQueue() {
            return QueueBuilder.durable(MESSAGE_ACK_QUEUE).build();
        }
    
        @Bean
        public DirectExchange directMessageAckExchange() {
            return (DirectExchange) ExchangeBuilder.directExchange(MESSAGE_ACK_EXCHANGE).durable(true).build();
        }
    
        @Bean
        public Binding directMessageBinding(DirectExchange directMessageAckExchange, Queue messageAckQueue) {
            return BindingBuilder.bind(messageAckQueue).to(directMessageAckExchange).with(MESSAGE_ACK_ROUTE_KEY);
        }
    
    }
    

    1.3 消息生产者

    /**
     * 生产者
     */
    @Component
    public class MessageAckProducer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 给hello队列发送消息
         */
        public void send() {
            for (int i = 0; i < 100; i++) {
                String msg = "hello,序号: " + i;
                System.out.println("Producer," + msg);
                rabbitTemplate.convertAndSend(MeassageAckConfig.MESSAGE_ACK_EXCHANGE, MeassageAckConfig.MESSAGE_ACK_ROUTE_KEY, msg);
            }
        }
    
    }
    

    1.4 消息消费者

    /**
     * 消费者
     */
    @Component
    public class MessageAckConsumer {
    
        private static final Logger logger = LoggerFactory.getLogger(MessageAckConsumer.class);
    
        @RabbitListener(queues = MeassageAckConfig.MESSAGE_ACK_QUEUE)
        public void process(Message message, Channel channel) {
            try {
                // 采用手动应答模式,手动确认应答更为安全稳定
                logger.info("receive: " + new String(message.getBody()));
                // 制造异常,向队列回放消息
                if ("\"hello,序号: 50\"".equals(new String(message.getBody()))) {
                    int a = 1/0;
                }
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                logger.error("process error by message: {} and channel: {}", message.getMessageProperties().getCorrelationId(),
                        channel.getConnection().getAddress());
                /*try {
                    // 拒绝消息,multiple=false,值拒绝当前的消息,requeue=true,重新放回队列
                    // 一般不回放,不然会一致消费,记录日志查找原因
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                } catch (IOException ex) {
                    ex.printStackTrace();
                }*/
            }
        }
    }
    

    1.5 测试

    @RestController
    public class MessageAckController {
    
        @Autowired
        private MessageAckProducer messageAckProducer;
    
        @GetMapping(value = "/messageAck")
        public void testMessageAck() {
            messageAckProducer.send();
        }
    
    }
    
    1604840762465.png

    七、消息序列化

    • 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
    • RabbitMQ 的序列化是指 Message 的 body 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter(默认)、Jackson2JsonMessageConverter 等
    • 当调用了 convertAndSend 方法时会使用 MessageConvert 进行消息的序列化
    • SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理,如果是 String 则转成字节数组,如果是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差
    • 当使用 RabbitMQ 作为中间件时,数据量比较大,此时就要考虑使用类似 Jackson2JsonMessageConverter 等序列化形式以此提高性能

    序列化配置:

    @Configuration
    public class RabbitConfig {
    
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(this.connectionFactory());
            // 生产者端发送消息,使用Jackson2JsonMessageConverter序列化
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            // 消息发送失败返回到队列中, yml需要配置 publisher-returns: true
            rabbitTemplate.setMandatory(true);
    
            // 消息确认, yml需要配置 publisher-confirms: true
            // 消息回调
            // ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调,即消息发送到exchange ack
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    logger.debug("消息发送到exchange成功!");
                } else {
                    logger.debug("消息发送到exchange失败,原因: {}", cause);
                }
            });
    
            // 消息返回, yml需要配置 publisher-returns: true
            // ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调,即消息发送不到任何一个队列中 ack
            rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
                String correlationId = message.getMessageProperties().getCorrelationId();
                logger.debug("消息:{} 发送失败,应答码:{} 原因:{} 交换机: {} 路由键: {}", correlationId, replyCode, replyText, exchange, routingKey);
            });
            return rabbitTemplate;
        }
    
        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            // 消费者端,接收消息使用Jackson2JsonMessageConverter反序列化
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            return factory;
        }
    
    }
    

    发送消息:

    @RestController
    public class MessageSerializableController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @GetMapping(value = "/messageSerializable/{name}")
        public void messageSerializable(@PathVariable(value = "name") String name) {
            User user = new User(name, 20, new Date());
            rabbitTemplate.convertAndSend(QueueConstant.QUEUE_NOTIFY_TEST, user);
        }
    
    }
    

    接收消息:

    @Component
    public class MessageSerilizableConsumer {
    
        /**
         * 使用@Payload获取body信息
         * @param user 直接反序列化的对象
         */
        @RabbitListener(queues = {QueueConstant.QUEUE_NOTIFY_TEST})
        public void receiveTestQueue(@Payload User user) {
            System.out.println(user.getName());
        }
    
    }
    

    测试:

    1604844242748.png

    八、Headers

    Headers应用场景,比如对于headers中有某些属性的消息可以选择性处理,对应管理台的Headers:


    1604845932396.png

    生产者:

    @RestController
    public class HeadersController {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        private final static MessagePostProcessor MESSAGE_POST_PROCESSOR = message -> {
            message.getMessageProperties().setContentType("application/json");
            message.getMessageProperties().setContentEncoding("UTF-8");
            // 设置Headers
            message.getMessageProperties().setHeader("name", "mitter");
            return message;
        };
    
        @GetMapping(value = "/headers/{name}")
        public void headers(@PathVariable(value = "name") String name) {
            User user = new User(name, 20, new Date());
            rabbitTemplate.convertAndSend(QueueConstant.QUEUE_NOTIFY_TEST, user, MESSAGE_POST_PROCESSOR);
        }
    
    }
    

    消费者:

    @Component
    public class MessageSerilizableConsumer {
    
        /**
         * 使用@Payload获取body信息,使用@Headers获取Headers信息
         * @param user 直接反序列化的对象
         */
        @RabbitListener(queues = {QueueConstant.QUEUE_NOTIFY_TEST})
        public void receiveTestQueue(@Payload User user, @Headers Map<String,Object> headers) {
            System.out.println(user.getName());
            System.out.println(JSON.toJSONString(headers));
        }
    
    }
    

    测试:

    1604846139832.png 1604846196198.png

    相关文章

      网友评论

          本文标题:消息确认与回调、消息序列化与Headers

          本文链接:https://www.haomeiwen.com/subject/bwnrbktx.html