美文网首页
RabbitMQ丢消息的解决方案

RabbitMQ丢消息的解决方案

作者: h2coder | 来源:发表于2024-01-22 16:45 被阅读0次

    3种丢消息的场景

    • 发送消息到交换机或队列时,丢消息(设置2个回调)
    • 消息到MQ软件,MQ因宕机而要重启,丢消息(交换机、队列、消息的durable属性,要设置为持久化,Spring的RabbitTemplate默认就是将这3个都持久化的,一般不需要去改)
    • 消费者没有正常消费消息,丢消息(默认消费方是阅后即焚的,所以消息从队列出队给消费方后,队列中就没有这个消息了,消费方没有正常去消费,消息就丢失了)

    发送消息到交换机或队列时,丢消息

    • 开启发送消息后的回调配置
    spring:
      rabbitmq:
        publisher-confirm-type: correlated
        publisher-returns: true
        template:
          mandatory: true
    
    • 设置RabbitTemplate的2个回调

    • setConfirmCallback,消息发送给交换机后回调,成功时ack参数为true,失败则为false

    • setReturnCallback,消息从交换机投递给队列失败时,才会回调,所以要在这个回调时,记录日志

    @Slf4j
    @Configuration
    // 实现ApplicationContextAware接口,可以从已有的spring上下文取得已实例化的bean
    public class CommonConfig implements ApplicationContextAware {
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            // 获取RabbitTemplate实例
            RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
            
            // 设置confirm callback,投递消息到交换机成功或失败,都会回调此方法
            // 注:如果投递成功,方法的ack参数为true,失败则为false
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    log.info("消息投递完成,ack = {}, cause = {}", ack, cause);
                }
            });
            
            // 设置return callback,从交换机投递到队列失败时,才会回调该方法
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    // 记录日志
                    log.info("消息投递失败,replyCode = {},replyText = {},exchange = {},routingKey = {},message = {}", replyCode, replyText, exchange, routingKey, message.toString());
                }
            });
        }
    }
    

    MQ软件重启,丢消息

    • 在Java代码中,创建交换机、队列时,就会设置为持久化,属性名为durable,在RabbitMQ的后台中看,有一个大写的D,就是设置了持久化的了,一般我们都会设置为持久化,保证MQ重启不丢消息

    消费者没有正常消费消息,丢消息

    默认消费者收到消息后,MQ就会将消息从队列中删除,也就是阅后即焚,我们需要设置MQ的确认模式,一般我们可以设置为auto自动或manual手动,以下以手动为例

    手动模式

    • 在消费者的application.yml文件中,配置以下内容
    spring:
      rabbitmq:
        listener:
          simple:
            acknowledge-mode: manual # 确认模式有3种,manual、auto、none
    
    • 确认模式
      • manual:手动ack,需要在业务代码结束后,调用api发送ack。
      • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
      • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
      • 即,none是失败后什么都不处理,auto是类似事务机制,出现异常时返回nack,消息回滚到MQ,没有异常则返回ack,消息才从MQ中删除。manual是手动自己判断业务是否正常执行,成功则手动返回ack
    @Component
    @Slf4j
    public class SpringRabbitListener {
        @RabbitListener(queues = "simple.queue")
        public void listenSimpleQueue(Message msg, Channel channel) throws Exception {
            System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
            // 模拟异常
            System.out.println(1 / 0);
            
            // 业务执行正常,才回复ack
            // 参数一:deliveryTag,也就是消息的标识,从msg中获取
            // 参数二:multiple,如果MQ是集群,true则是需要通知集群中的所有MQ
            channel.basicAck(msg.getMessageProperties().getDeliveryTag(), true);
            
            log.debug("消息处理完成!");
        }
    }
    

    自动模式

    • 将消费者的确认模式,修改为auto
    spring:
      rabbitmq:
        listener:
          simple:
            acknowledge-mode: auto # 确定模式:auto,为自动ack
    
    • 重新运行消费者,会发现消费者消费出现异常,然后将消息归还给MQ,然后消费者的Listener监听队列又有消息,又从队列中拿出来消息,导致出现的无限死循环!!

    重试次数

    • 因此不能无限重试,我们应该限制重试的次数,以及重试完毕后的失败策略(例如重试了3、5次后,还是失败,则将消息投递到一个特定的错误消息交换机,然后再投递到错误消息队列)

    • 配置spring的retry机制,当消费者消费出现异常时,进行本地重试,而不是无限制的requeue重新入队到MQ队列中,其中enabled属性为开启失败重试,max-attempts为最大重试次数

    • 本地重试:也就是消息消费过程中,出现异常,不会将消息requeue到队列,而是在消费者本地进行重试,就不会出现频繁requeue,给MQ造成不必要的压力

    spring:
      rabbitmq:
        listener:
          simple:
            retry:
              enabled: true # 开启消费者失败重试
              initial-interval: 1000 # 初始的失败等待时长为1秒 2  4  8  16  32
              multiplier: 2 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
              max-attempts: 3 # 最大重试次数
              stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
    
    • 最后,重试达到最大次数后,Spring会返回ack,消息会被丢弃,所以我们还需要配置失败策略

    失败策略

    • 默认,重试达到最大重试次数后,消息会丢失,这个是Spring的内部机制决定的,默认的失败处理策略是丢弃消息,我们可以配置策略的实现,策略接口为MessageRecoverer,有3种策略实现,分别是:

      • RejectAndDontRequeueRecoverer,到达最大重试次数后,直接reject,丢弃消息,默认就是这种
      • ImmediateRequeueMessageRecoverer,到达最大重试次数后,返回nack,消息重新requeue入队
      • RepublishMessageRecoverer,到达最大重试次数后,将失败消息投递到指定的交换机
    • 比较优雅的方式是选用RepublishMessageRecoverer,例如使用这种策略方式,当到达到最大重试次数后,将消息投递到一个错误消息交换机,然后交换机再投递到一个专门存放错误消息的错误队列,后续人工再集中处理

    • 在消费方,定义处理错误的交换机和队列

    // 错误消息交换机
    @Bean
    public DirectExchange errorMessageExchange(){
        return new DirectExchange("error.direct");
    }
    
    // 错误消息队列
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue", true);
    }
    
    // 绑定错误交换机和错误消息队列
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
        return BindingBuilder
        // 队列
        .bind(errorQueue)
        // 交换机
        .to(errorMessageExchange)
        // 设置routingKey
        .with("error");
    }
    
    • 定义失败策略,指定错误消息交换机和routingKey
    @Bean
    public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
    
    • 此时,再次启动生产者,发送消息,消费者重试3次失败后,执行失败策略,将消息投递到了error错误队列,在RabiitMQ的控制台中,点击get message按钮,就能获取到消息内容,以及错误消息的堆栈

    完整代码

    @Configuration
    public class ErrorMessageConfig {
        @Bean
        public DirectExchange errorMessageExchange(){
            return new DirectExchange("error.direct");
        }
        @Bean
        public Queue errorQueue(){
            return new Queue("error.queue", true);
        }
        @Bean
        public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange){
            return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");
        }
    
        @Bean
        public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
            return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
        }
    }
    

    相关文章

      网友评论

          本文标题:RabbitMQ丢消息的解决方案

          本文链接:https://www.haomeiwen.com/subject/dngyodtx.html