美文网首页
rocketmq_消费消息出现异常时

rocketmq_消费消息出现异常时

作者: kele2018 | 来源:发表于2022-05-03 16:41 被阅读0次

最近项目组的一个服务出现了消息丢失的问题,虽然当时通过手动重发的方式解决了,但是对于丢失的原因一直没有去深挖,这几天趁着疫情隔离在家看了一下,现在做个总结;下面是当时发生丢失问题的代码:

@Service
@Slf4j
@RocketMQMessageListener(topic = "****", consumerGroup = "****", ACK = ACKMode.AUTO)
public class MQFxQrCodeLogListener extends AbstractMQListener {

    @Override
    protected void messageListen(MQMessage message) throws Exception {
        try{
            // 消费业务
        } catch (Exception e) {
            Throwable throwable = new CommonException(BootConstants.SERVICE_ERR_CODE, e.getMessage(),
                    JSON.toJSONString(message), e);
            //钉钉告警
            notifyCollectorService.exception(throwable);
        }
    }
}

1、rocketmq消息消费流程
(1) 负载均衡,构造PullRequest,并存入队列;
(2) 从队列中获取请求,并向broker拉取消息;
(3) 向线程池中提交任务;

public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
           //构造任务
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
               //提交任务
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            // 一条一条提交到线程池
            for (int total = 0; total < msgs.size(); ) {
                List<MessageExt> msgThis = new ArrayList<MessageExt>(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }

                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }

                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }

(4) 使用listener消费消息;

public void run() {
    if (this.processQueue.isDropped()) {
        log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
        return;
    }
    ......
    ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
    try {
        if (msgs != null && !msgs.isEmpty()) {
            for (MessageExt msg : msgs) {
                MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
            }
        }
        // 消费逻辑由用户自定义,所以可能抛出异常
        status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
    } catch (Throwable e) {
        log.warn("consumeMessage exception: {} Group: {} Msgs: {} MQ: {}",
            RemotingHelper.exceptionSimpleDesc(e),
            ConsumeMessageConcurrentlyService.this.consumerGroup,
            msgs,
            messageQueue);
        hasException = true;
    }
   if (null == status) {
           log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
           ConsumeMessageConcurrentlyService.this.consumerGroup, msgs,
 messageQueue);
                status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
     }
    ......
    if (!processQueue.isDropped()) {
      //处理消费结果
        ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
    } else {
        log.warn("processQueue is dropped without process consume result. messageQueue={}, msgs={}", messageQueue, msgs);
    }
}

(5) 处理消费结果;

public void processConsumeResult(
        final ConsumeConcurrentlyStatus status,
        final ConsumeConcurrentlyContext context,
        final ConsumeRequest consumeRequest
    ) {
        int ackIndex = context.getAckIndex();

        if (consumeRequest.getMsgs().isEmpty())
            return;

        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }

        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    /**
                      1、只有处理失败的消息才会上报broker      每次只发送一个消息的处理结果
                      2、context中可以设置具体的处理策略
                   
                          Message consume retry strategy<br>
                        -1,no retry,put into DLQ directly<br>
                         0,broker control retry frequency<br>
                        >0,client control retry frequency
     
                       private int delayLevelWhenNextConsume = 0;
                   **/
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }

                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);

                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }

        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

2、接下来我们看springboot与rokcetmq集成的时候,具体的异常处理策略

public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            Iterator var3 = msgs.iterator();

            while(var3.hasNext()) {
                MessageExt messageExt = (MessageExt)var3.next();
                DefaultRocketMQListenerContainer.log.debug("received msg: {}", messageExt);

                try {
                    long now = System.currentTimeMillis();
                    DefaultRocketMQListenerContainer.this.rocketMQListener.onMessage(DefaultRocketMQListenerContainer.this.doConvertMessage(messageExt));
                    long costTime = System.currentTimeMillis() - now;
                    DefaultRocketMQListenerContainer.log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception var9) {
                    DefaultRocketMQListenerContainer.log.warn("consume message failed. messageExt:{}", messageExt, var9);
                   // 由于我们在定义监听器的时候,ACK = ACKMode.AUTO,所以此处一定是false
                    if (DefaultRocketMQListenerContainer.this.canRetry(messageExt)) {
                        context.setDelayLevelWhenNextConsume(DefaultRocketMQListenerContainer.this.delayLevelWhenNextConsume);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                   // -1是进死信队列
                    context.setDelayLevelWhenNextConsume(-1);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }

回头看我们的代码,我们处理消费业务抛出的异常时,发送了钉钉告警,并吞掉了异常,此时rocketmq会认为该消息成功消费了,既不会进死信队列,也不会重试;造成消息丢失。

相关文章

  • rocketmq_消费消息出现异常时

    最近项目组的一个服务出现了消息丢失的问题,虽然当时通过手动重发的方式解决了,但是对于丢失的原因一直没有去深挖,这几...

  • rocketmq_顺序消息

    Q:在rocketmq语境下,如何定义【顺序】这个词? Q:为了保证这种效果,生产端应该如何做? Q:为了保证这种...

  • RabbitMQ实际使用过程遇到问题

    消息重复被消费 今天日常开始统计系统信息(rabbitmq),被业务提醒数据出现异常,查找日志发现某条订单的消息被...

  • RabbitMQ的消息确认ACK机制

    1、什么是消息确认ACK如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息...

  • RabbitMQ的ack机制

    1、什么是消息确认ACK。 答:如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处...

  • RocketMQ消息重试

    RocketMQ为了保证消息被消费采用ACK确认机制,消费者消费消息时需要给Broker反馈消息消费的情况,成功或...

  • Kafka 消费者心跳线程源码解析

    kafka消费者在消费消息时,分为心跳线程和用户线程(处理消息的线程) 消费消息poll方法 我们在第一次启动消费...

  • 消息队列-3 如何保证消息可靠性(RabbitMQ)

    消息丢失的场景 消息发送时消息丢失 路由消息时消息丢失 消息未持久化消息丢失 消费消息时消息丢失 消息发送可靠性 ...

  • Apache Pulsar 消息传递模型(2)-消息确认

    当使用跨机器分布的消息传递系统时,可能会发生故障。在消费者从消息传递系统中的主题消费消息的情况下,消费消息的消费者...

  • rocketmq-consumer

    rocketmq 消费消息大致有以下几种场景类型 乱序消费,消息被乱序的发送的队列,消费者在消费各个队列时是并行消...

网友评论

      本文标题:rocketmq_消费消息出现异常时

      本文链接:https://www.haomeiwen.com/subject/njwbyrtx.html