美文网首页
RabbitMQ使用的一些问题

RabbitMQ使用的一些问题

作者: 黄金矿工00七 | 来源:发表于2019-08-23 00:37 被阅读0次

    先说点闲话,这个问题的发生是因为当年我的垃圾代码导致,嗯,垃圾到我自己不想看。刚毕业的年轻人总是想探索一下未知,于是一知半解之下就上了MQ,但是话说回来,这种自讨苦吃是要的,不然真的会一直垃圾下去。

    问题的发生

    项目中的消息推送使用MQ做了异步处理,有一天消息推送突然中断了,排查了好久,日志也没有,这个也是,嗯,知识太少。好久之后,我想不会是MQ出问题了吧,我一看MQ,果然,这里不是原图,Unacknowledged状态的有8,Ready的有好多,再打开消费端的配置一看,

    MQ管理
    消费端配置 ,qos设置正好为8,通过以上排查基本可以确定队列堵塞是由于消费者线程取走了消息,但是既没有ACK,也没有NACK,这样的消息个数到达Qos设置的值后,队列就会堵塞。
    • 问题代码
    @Component
    public class MsgQueueListener extends MessageListenerAdapter {
    
      private static Logger logger = LoggerFactory.getLogger(MsgQueueListener.class);
      @Autowired
      private RabbitTemplate rabbitTemplate;
      @Autowired
      private MessageSendFacadeService messageSendFacadeService;
      @Autowired
      private TaskExecutor taskExecutor;
      @Autowired
      private RedisUtil redisUtil;
    
      @Override
      public void onMessage(final Message message, final Channel channel) throws IOException {
        final WechatTemMessageDTO dto = (WechatTemMessageDTO) rabbitTemplate.getMessageConverter()
            .fromMessage(message);
        try {
         //业务代码处理
          channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (AnswerException e) {
          if (getRetryCount(message.getMessageProperties() < 3) {
            dto.setErrMsg(e.getErrorMsg());
            //重试次数小于3 ,投递到重试队列
            rabbitTemplate.convertAndSend("*", "*",
                dto, new MessagePostProcessor() {
                  @Override
                  public Message postProcessMessage(Message message) throws AmqpException {
                    MessageProperties properties = new MessageProperties();
                    properties.setHeader("x-orig-routing-key", "*");
                    return message;
                  }
                });
          } else {
            dto.setErrMsg(e.getErrorMsg());
            rabbitTemplate.convertAndSend("*failed", "*failed", dto,
                new MessagePostProcessor() {
                  @Override
                  public Message postProcessMessage(Message message) throws AmqpException {
                    MessageProperties properties = new MessageProperties();
                    properties.setHeader("x-orig-routing-key", "*");
                    return message;
                  }
                });
          }
          channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
      }
    
    

    问题出在catch块,

    1. catch只捕获了业务异常
      对于非业务异常,显然无法捕获,导致消费者没有ack,这段代码主要的原因再调用业务代码的时候,我尽可能的将业务代码捕获的异常转换为了业务异常,但是产生了遗漏,而且这里这样做也有很大的坏处,没有将异常分类,来决定失败消息如何处理,下面再细说。
    2. catch块中可能又会抛出异常
      catch块中抛出的异常显然导致消费者没有ack,也没有finally进行处理,所以消费者慢慢的阻塞。
      解决办法应该是在finally语句中来执行这些操作,消费者从队列中取出消息后,无非是三种处理结果:1、处理成功,这种时候应该用basicAck确认消息;2、可重试的处理失败,这时候应该用basicNack将消息重新入列或者丢入死信队列3、不可重试的处理失败,这时候应该使用basicNack将消息丢弃或者丢入失败队列进行相应的业务操作
    • 正确示例
    enum ProcessResult{
    //这里只举几个简单的例子
      SUCCESS,  // 处理成功
      RETRY,   // 可以重试的错误
      FAIL,  // 无法重试的错误
    }
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            WechatMessageDTO messageDTO = null;
            try {
                messageDTO = rabbitMQService.getMessageBody(message);
            }
            catch (Exception e) {
                logger.error("MQ 数据转换异常",e);
            } finally {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
            ProcessResult result=null;
            try {
                result = messageService.processMsg(messageDTO);
            } catch(UserDefineException e){
              logger.error("消费者处理失败:(消息ID:" + messageDTO.getId() + ")", e);
              //根据业务异常类型进行处理
              result = ;
            }catch (Exception e) {
                logger.error("消费者处理失败:(消息ID:" + messageDTO.getId() + ")", e);
                result = ;
            } finally {
                postProcessByResult(result);
            }
        }
    

    容易出问题的点

    • 自动ack机制导致消息丢失以及客户端崩溃
      MQ只要确认消息发送成功,无须等待应答就会丢弃消息,虽然自动ack机制可以防止队列阻塞的问题,但是无法得知消费者的处理情况。自动ack没有qos控制,只要客户端队列不为空,则不断推送消息,可能导致消费者假死或者崩溃。
      qos是rabbitMQ一种消费限流的手段,上面提到的prefetch属性指定每个消费者最大的unacked messages数目。消费者每次最多可以取prefetch条消息缓存在客户端,Java客户端内部维护了一个BlockingQueue用来缓存从queue获取的message,默认值会设为Integer.MAX_VALUE,如果不设置qos可能会导致队列不断膨胀,最终OOM;Spring amqp提供了类似的功能,队列的大小是prefetch的大小,默认是1,关于prefetch的设置可以参考Some queuing theory: throughput, latency and bandwidth
      假设有客户端两个消费者线程,prefetch都是10,意味着每个消费者线程每次会从queue中预抓取 10 条消息到本地缓存着等待消费。这里对于MQ来说,客户端只是一个消费者,他们之间建立的Connection(包含多个channel,通常每个消费者线程使用一个)的unacked数变为20,但是对于客户端来说,可能是多个消费者线程,每个channel的unack数量达到prefetch预设值,并且达到最大的最大消费者线程数。便会停止投递新的message到该消费者中直到它发出ack。关于这部分大家可以看我以前写的去了解RabbitMQ连接池,Rabbit将会停止投递新的message到该消费者中直到它发出ack。
    • nack机制导致的死循环
      消息处理失败时使用Nack,等下一次重新消费,导致队列中Ready状态的消息暴增,
    • 启用ack机制但是没有启动qos
      如我上面发生的问题,如果没有qos,消息处理发生异常后,无法ack,队列的Unacked消息数暴涨,导致MQ响应越来越慢,甚至崩溃。

    一些消息可靠性保证措施

    对于生产者:

    • 发送确认
      实现ReturnCallback接口来得到消息发送失败的原因
    • 发送失败返回
      实现ConfirmCallback接口来确认是否正确到达exchange,当使用confirm时候,如果channel或者connection失败,生产者应该重新发送所有没有来得及提交的数据。但是服务器broker可能已经发送确认数据到生产者了,因此消费者要具有幂等性。
      对于消费者:
    • 通过redelivedred来确认消息是否重复发送

    相关文章

      网友评论

          本文标题:RabbitMQ使用的一些问题

          本文链接:https://www.haomeiwen.com/subject/tlhasctx.html