美文网首页
RabbitMQ 如何保证消息的可靠性

RabbitMQ 如何保证消息的可靠性

作者: 这里有颗小螺帽 | 来源:发表于2021-08-02 16:13 被阅读0次
    确保消息不丢失.png

    队列持久化

    // 队列消息持久化
    boolean durable = true;
    channel.queuDeclare = (ACK_QUEUE_NAME,durable,flase,false,null);
    

    上面的代码就是进行消息持久话,当然还有其他写法,例如:

        @Bean
        public Queue directProductQueue(){
    
            return QueueBuilder.durable(队列名);
    

    其他写法不一一赘述。
    如果队列A之前没有持久化,重启RabbitMQ后,队列会消息,并且,在代码里将队列A改为了持久化,需要先将原来的队列删除掉,否则会报错。
    持久化后,在控制台中会显示"D",这样的话,即使重启RabbitMQ,队列A也会照样存在。

    image.png

    消息持久化

    队列持久化并不能让消息持久化,如果RabbitMQ宕机,重启后,持久化后的队列还会存在,因为消息默认保存在内存中,所以消息会丢失,如果想让消息不丢失,或者丢失的少,最好将消息进行持久化,需要在生产段进行配置

      Message message1 = MessageBuilder.withBody(msgBody.getBytes())
                        .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
                        .setContentEncoding("UTF-8")
                        .setCorrelationId(msgId).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
                CorrelationData correlationData = new CorrelationData(msgId);
                rabbitTemplate.convertAndSend(exchange, routingKey, message1,correlationData);
    

    以上代码中,setDeliveryMode(MessageDeliveryMode.PERSISTENT) 就是将消息进行了持久化。即使RabbitMQ宕机,消息也不会全部丢失,为什么不能保证全部不丢失呢?因为在一种极端情况下,例如RabbitMQ在将消息写入磁盘的过程中,RabbitMQ宕机,此时,还未写入磁盘的部分消息就会丢失。

    当然有很多方法可以保证消息尽可能不丢失,例如生产者发送消息后立马将消息写入数据库,即使RabbitMQ让部分消息丢失,我们也可以通过数据库里的消息进行补偿,例如重发消息,但是发消息时同时写库,对性能会有一些影响。

    发布确认

    什么是发布确认,发布确认就是生产者发布的消息被投递到指定队列后,broker会通过回调函数告诉生产者消息投递成功了,要注意,这只是消息投递成功了,而不是消费成功。

    发布确认是否开启需要自己手动设置,比如可以在application.yml中设置如下:

      rabbitmq:
        addresses: xx.xx.xx.x
        port: 5672
        username: xxx
        password: xxxxxxxx
        publisher-confirms: true #是否开启回调
    
    • 单个确认
      单个确认发布属于同步确认,发一条消息确认一次,缺点是发布消息比较慢,这种方式最多提供每秒不超过数百条的发布消息吞吐量。

    • 批量确认
      相比于单个确认,批量确认极大的提高了吞吐量,但是当发生故障时,不能确定哪条消息出了问题,同时,批量确认也是同步的。

    • 异步确认
      异步确认不会同步等待broker的确认信息,异步响应broker的确认信息。
      先贴一下代码:

    @Component
    @Slf4j
    public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void initRabbitTemplate() {
            // 设置生产者消息确认
            rabbitTemplate.setConfirmCallback(this);
    
        }
    
        /**
         * 消息发送到 Broker 后触发回调
         *
         * @param correlationData bean
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    
            if (ack) {
                  // 如果消息被确认了,走一套逻辑
            } else {
                //如果消息没有被确认,是否补偿?持久化到数据库还是定期处理?  correlationData.getId()
            }
        }
    

    对于消息被确认还是没有被确认的具体处理逻辑需要自己去写,你可以在发消息前将消息先存入redis或者MySQL,为每一条消息设置一个唯一的id(可以用UUID、雪花算法等等),就是correlationData.getId(),当消息没有被确认,可以拿着这个唯一的id将完整的消息取出来,做消息补偿还是只是记录错误日志自己定夺。

    手动ack

    消息到达队列后,准备被消费者消费,消息被成功消费后,即业务处理完成后,进行手动ack,RabbitMQ默认是自动ack的,就是只要开始消费,就会被自动ack,自动ack后,队列中对应的这条消息就没了,生产中最好用手动ack。

     channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    

    如果在消费过程中出现了问题,可以将消息reject,reject后可以选择消息重新入队或者消息直接被丢弃,下面代码中的 false 表示不重新入队,如果重新入队,可能会带来一个问题,就是如果这条消息永远会在被消费的过程中产生错误,那么这条消息就会不断地被重新入队,会造成死循环。

    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    

    死信队列

    消息在被消费的过程中发生错误怎么办呢,重新入队有风险,可以将消息发到死信队列进行处理,不影响原队列。

    先说一下什么是死信,就是由于某些原因导致队列中的某些消息无法被消费,这些消息如果没有后期的处理,就会变成死信,用来处理死信的队列就是死信队列,当然死信队列还可以当作延迟队列用。

    设置死信队列的方法可以参考下方代码:

    @Configuration
    public class RabbitConfig {
    
        // 交换机
        public static final String EXCHANGE_TEST= "exchangeTest";
    
        // 路由键
        public static final String ROUTING_KEY_TEST = "routingKeyTest";
    
         // 队列
        public static final String DIRECT_QUEUE_TEST = "direct.queuetest";
    
    
    
        /**
         * 交换机
         **/
        @Bean
        public DirectExchange directExchange() {
    
            return new DirectExchange(EXCHANGE_TEST);
        }
    
        /**
         * 队列
         **/
        @Bean
        public Queue directQueue() {
    
            return QueueBuilder.durable(DIRECT_QUEUE_TEST)
                    //死信交换机声明
                    .withArgument("x-dead-letter-exchange", DeadMQConfig.DIRECT_DEAD_EXCHANGE_NAME)
                    //死信消息的路由key
                    .withArgument("x-dead-letter-routing-key", DeadMQConfig.DIRECT_DEAD_ROUTING_KEY_NAME)
                    .build();
    
        }
    
        /**
         * Binding,将该routing key的消息通过交换机转发到该队列
         */
        @Bean
        public Binding directBinding() {
    
            return BindingBuilder.bind(directQueue()).to(directExchange()).with(ROUTING_KEY_TEST );
    
        }
    
    
    }
    

    参考:
    [1] 尚硅谷-《消息中间件RabbitMQ》
    [2] https://blog.csdn.net/qq_32662795/article/details/88742397
    [3] https://www.cnblogs.com/he-erduo/p/13558308.html

    相关文章

      网友评论

          本文标题:RabbitMQ 如何保证消息的可靠性

          本文链接:https://www.haomeiwen.com/subject/kwllpltx.html