美文网首页一些收藏收藏
🏆【Alibaba中间件技术系列】「RocketMQ技术专题」分

🏆【Alibaba中间件技术系列】「RocketMQ技术专题」分

作者: 洛神灬殇 | 来源:发表于2022-02-03 13:56 被阅读0次

    RocketMQ消费失败重试机制分析

    今天我们分析一下RocketMQ消费重试机制,如果执行以下的RocketMQ的消费服务的代码

    
    try {
    
                try {
    
                    if (messageExtWrappers.size() > 0) {
    
                        try {
    
                            var22 = messageExtWrappers.iterator();
    
                            while(var22.hasNext()) {
    
                                messageExt = (MessageExt)var22.next();
    
                            }
    
                        } catch (Throwable var16) {
    
                            ;
    
                        }
    
                        this.consume(messageExtWrappers, context);
    
                    }
    
                    LOGGER.info("MQ_CON_SUCCESS {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId});
    
                    ConsumeConcurrentlyStatus var23 = ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    
                    return var23;
    
                } catch (MessageListenerConcurrentlyException var17) {
    
                    LOGGER.error("MQ_CON_EX {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId, var17});
    
                    throw var17;
    
                } catch (Throwable var18) {
    
                    LOGGER.error("MQ_CON_EX {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId, var18});
    
                    LOGGER.info("MQ_CON_RECONSUME {} BROKER {} QUEUE {}", new Object[]{topic, broker, queueId});
    
                    if (CollectionUtils.isNotEmpty(msgs) && ((MessageExt)msgs.get(0)).getDelayTimeLevel() >= 2 + this.retryTimes) {
    
                        context.setDelayLevelWhenNextConsume(-1);
    
                    }
    
                }
    
    

    当进行消费数据后得出的结果日志如下:

    第一次消费

    MQ_CON_MSG topic MSG MessageExt [queueId=1, storeSize=453, queueOffset=25, sysFlag=0, bornTimestamp=1566785215908, bornHost=/10.42.0.77:54608, storeTimestamp=1566785215908, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B77CE84, commitLogOffset=192401028, bodyCRC=53737244, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={MIN_OFFSET=0, _catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15071, HASH_CODE=690132963, MAX_OFFSET=26, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785215911, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15072, UNIQ_KEY=0A2A004D000938AF386882EAA5A40112, WAIT=true}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 48, 52, 54, 56, 57, 52, 48, 52, 48, 56, 48], transactionId='null’}]

    第一次retry(reconsumeTimes=1,DELAY=3)

    MQ_CON_MSG %RETRY%consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1187, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785226241, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B785900, commitLogOffset=192436480, bodyCRC=893293938, reconsumeTimes=1, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785226242, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1188, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=3, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]

    第二次retry(reconsumeTimes=2, DELAY=4)

    MQ_CON_MSG %RETRY%consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1209, sysFlag=0, bornTimestamp=1566785215923, bornHost=/10.42.0.77:54608, storeTimestamp=1566785256680, storeHost=/10.42.0.244:10911, msgId=0A2A00F400002A9F000000000B791399, commitLogOffset=192484249, bodyCRC=893293938, reconsumeTimes=2, preparedTransactionOffset=0, toString()=Message{topic='gmcf-lsc-zhongbang-repu-calc-from-topic', flag=0, properties={_catChildMessageId1=gmcf-lsc-job-0a2a004d-435218-15075, _catRootMessageId=lts-0a2a006b-434483-0, CONSUME_START_TIME=1566785256728, SERIALIZE_CLASS=java.lang.String, _catParentMessageId=lts-0a2a006b-435218-524, _catChildMessageId=gmcf-lsc-job-0a2a004d-435218-15076, MIN_OFFSET=0, REAL_TOPIC=%RETRY%gmcf-lsc-consumerGroup, ORIGIN_MESSAGE_ID=0A2A00F400002A9F000000000B77D049, RETRY_TOPIC=gmcf-lsc-zhongbang-repu-calc-from-topic, HASH_CODE=1506102461, MAX_OFFSET=1210, UNIQ_KEY=0A2A004D000938AF386882EAA5B30113, WAIT=false, DELAY=4, REAL_QID=0}, body=[71, 77, 66, 69, 67, 49, 57, 48, 56, 50, 52, 49, 48, 52, 49, 49, 49, 50, 48, 55, 48, 52, 48, 56, 49], transactionId='null'}]

    第三次retry(reconsumeTimes=3, DELAY=5)

    MQ_CON_MSG %RETRY%consumerGroup MSG MessageExt [queueId=0, storeSize=608, queueOffset=1228, sysFlag=0, bornTimestamp=1566785215923, b

    RocketMQ的默认的配置是messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,分别代表延迟level1-level18,为什么不是从1开始呢?

    
    if (CollectionUtils.isNotEmpty(msgs) && ((MessageExt)msgs.get(0)).getDelayTimeLevel() >= 2 + this.retryTimes) {
    
            context.setDelayLevelWhenNextConsume(-1);
    
    }
    
    

    当重试达到满足条件的时候,不再重试,直接放到dlq队列里面。如果不控制的,会一直重试到最高DelayLevel 18。

    从DefaultMQPullConsumerImpl类里面找到一段代码

    
    public void sendMessageBack(MessageExt msg, int delayLevel, String brokerName, String consumerGroup) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    
            try {
    
                String brokerAddr = null != brokerName ? this.mQClientFactory.findBrokerAddressInPublish(brokerName) : RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
    
                if (UtilAll.isBlank(consumerGroup)) {
    
                    consumerGroup = this.defaultMQPullConsumer.getConsumerGroup();
    
                }
    
                this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg, consumerGroup, delayLevel, 3000L, this.defaultMQPullConsumer.getMaxReconsumeTimes());
    
            } catch (Exception var8) {
    
                this.log.error("sendMessageBack Exception, " + this.defaultMQPullConsumer.getConsumerGroup(), var8);
    
                Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPullConsumer.getConsumerGroup()), msg.getBody());
    
                String originMsgId = MessageAccessor.getOriginMessageId(msg);
    
                MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);
    
                newMsg.setFlag(msg.getFlag());
    
                MessageAccessor.setProperties(newMsg, msg.getProperties());
    
                MessageAccessor.putProperty(newMsg, "RETRY_TOPIC", msg.getTopic());
    
                MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
    
                MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(this.defaultMQPullConsumer.getMaxReconsumeTimes()));
    
                newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
    
                this.mQClientFactory.getDefaultMQProducer().send(newMsg);
    
            }
    
        }
    
    

    从代码中看到DelayTimeLevel =3+reconsumeTime

    newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

    所以默认重试时,实际是从3开始的,从时间的角度,也验证为什么会重试4次,而且每次间隔的时间是10s/30s/1m .

    • 此外发送时,延时等级设置的是9,那么5m之后消费者才能从broker取到该消息消费。消费失败或者异常时,消费者返回给broker,ConsumeConcurrentlyStatus.RECONSUME_LATER,所以broker会重发该消息,由于消费者设置了再次消费的延时级别为5,所以1m之后,消费者才能从broker取出该重发消息。由于consumer没有设置重试次数,所以,broker默认重发该消息16次,然后放入删除队列。

    • DefaultRocketMQListenerContainer实现了MessageListenerConcurrently方法,它会循环调用rocketMQListener.onMessage,出现异常会设置delayLevelWhenNextConsume,然后立即返回ConsumeConcurrentlyStatus.RECONSUME_LATER

    相关文章

      网友评论

        本文标题:🏆【Alibaba中间件技术系列】「RocketMQ技术专题」分

        本文链接:https://www.haomeiwen.com/subject/jlidkrtx.html