美文网首页
(十)ack确认

(十)ack确认

作者: guessguess | 来源:发表于2021-07-22 16:22 被阅读0次

    在前面看了消费者并发消费的源码。得知消费者在消费消息失败之后,会尝试将消息sendback, 当sendback失败之后再进行重试消费。但是对于sendback这个机制还是不太清楚。所以看了一下sendback的源码。

    首先还是定位一下入口。

    public class MQClientAPIImpl {
        public void consumerSendMessageBack(
            final String addr,
            final MessageExt msg,
            final String consumerGroup,
            final int delayLevel,
            final long timeoutMillis,
            final int maxConsumeRetryTimes
        ) throws RemotingException, MQBrokerException, InterruptedException {
            ConsumerSendMsgBackRequestHeader requestHeader = new ConsumerSendMsgBackRequestHeader();
            RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CONSUMER_SEND_MSG_BACK, requestHeader);
    
            requestHeader.setGroup(consumerGroup);
            requestHeader.setOriginTopic(msg.getTopic());
            requestHeader.setOffset(msg.getCommitLogOffset());
            requestHeader.setDelayLevel(delayLevel);
            requestHeader.setOriginMsgId(msg.getMsgId());
            requestHeader.setMaxReconsumeTimes(maxConsumeRetryTimes);
    
            RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
                request, timeoutMillis);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    return;
                }
                default:
                    break;
            }
    
            throw new MQBrokerException(response.getCode(), response.getRemark(), addr);
        }
    }
    

    从请求头上来看,设置了消费者对应的消费组,以及原有的topic,以及消息的偏移量,以及延迟级别,以及原有的消息id,还有设置最大重试次数。
    RequestCode.CONSUMER_SEND_MSG_BACK = 36.

    那么Broker是如何处理sendback的消息
    还是先来看看源码,首先找到入口。在rocketmq里面,NettyRequestProcessor为专门用于处理请求的一个channelHandler.有不同的请求处理器针对不同的请求。

    public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
        public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
                                                                      RemotingCommand request) throws RemotingCommandException {
            final SendMessageContext mqtraceContext;
            switch (request.getCode()) {
                case RequestCode.CONSUMER_SEND_MSG_BACK:
                    return this.asyncConsumerSendMsgBack(ctx, request);
                default:
                省略无关代码。
            }
        }
    }
    

    broker是如何处理返回的消息

    从上面代码看,无疑是asyncConsumerSendMsgBack方法。
    因此下面来看看实现的细节

    public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
        private CompletableFuture<RemotingCommand> asyncConsumerSendMsgBack(ChannelHandlerContext ctx,
                                                                            RemotingCommand request) throws RemotingCommandException {
            final RemotingCommand response = RemotingCommand.createResponseCommand(null);
            final ConsumerSendMsgBackRequestHeader requestHeader =
                    (ConsumerSendMsgBackRequestHeader)request.decodeCommandCustomHeader(ConsumerSendMsgBackRequestHeader.class);
            省略部分代码。
            根据消费组的group,找到对应的SubscriptionGroupConfig。
            SubscriptionGroupConfig subscriptionGroupConfig =
                this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
            如果为Null,直接返回响应
            if (null == subscriptionGroupConfig) {
                response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
                response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
                    + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
                return CompletableFuture.completedFuture(response);
            }
            如果没有写权限,直接返回响应
            if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1() + "] sending message is forbidden");
                return CompletableFuture.completedFuture(response);
            }
            如果可重试的队列<=0,直接返回,说明不可以进行重试。
            if (subscriptionGroupConfig.getRetryQueueNums() <= 0) {
                response.setCode(ResponseCode.SUCCESS);
                response.setRemark(null);
                return CompletableFuture.completedFuture(response);
            }
            生成新的topic,%RETRY% + 消费组的group
            String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
            随机找一个重试队列
            int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();
            int topicSysFlag = 0;
            if (requestHeader.isUnitMode()) {
                topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
            }
            创建topic,并且将新的topic配置同步到注册中心,用于生成新的路由数据,这样子消费者才找到的消息。
            TopicConfig topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
                newTopic,
                subscriptionGroupConfig.getRetryQueueNums(),
                PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
            如果top创建失败,或者不存在,则返回响应
            if (null == topicConfig) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("topic[" + newTopic + "] not exist");
                return CompletableFuture.completedFuture(response);
            }
            如果没有写的权限,则返回响应
            if (!PermName.isWriteable(topicConfig.getPerm())) {
                response.setCode(ResponseCode.NO_PERMISSION);
                response.setRemark(String.format("the topic[%s] sending message is forbidden", newTopic));
                return CompletableFuture.completedFuture(response);
            }
            根据偏移量找回以前的消息
            MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
           如果消息不存在的话,则直接返回响应
            if (null == msgExt) {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("look message by offset failed, " + requestHeader.getOffset());
                return CompletableFuture.completedFuture(response);
            }
            如果该消息没有被重试过,不会有对应的重试主题属性,需要设置
            final String retryTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
            if (null == retryTopic) {
                MessageAccessor.putProperty(msgExt, MessageConst.PROPERTY_RETRY_TOPIC, msgExt.getTopic());
            }
            msgExt.setWaitStoreMsgOK(false);
            延迟级别
            int delayLevel = requestHeader.getDelayLevel();
          
            int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
            if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
                maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
            }
            如果消息的重试次数以及大于最大的重试次数,或者delayLevel<0.
            则将消息放入死信队列
            if (msgExt.getReconsumeTimes() >= maxReconsumeTimes 
                || delayLevel < 0) {
                生成新的主题,%DLQ%+消费组的group
                newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
                随机生成队列id
                queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
                在broker中创建新的topic,并且同步到注册中心
                topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
                        DLQ_NUMS_PER_GROUP,
                        PermName.PERM_WRITE, 0);
                主题不存在,或者创建失败,则直接返回响应
                if (null == topicConfig) {
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark("topic[" + newTopic + "] not exist");
                    return CompletableFuture.completedFuture(response);
                }
            } else {
                if (0 == delayLevel) {
                    设置延迟级别,在保存之前会调整topic改为SCHEDULE_TOPIC_XXXX。暂时先不管这块是如何实现的
                    delayLevel = 3 + msgExt.getReconsumeTimes();
                }
                msgExt.setDelayTimeLevel(delayLevel);
            }
    -----------------------------------------------------------------------------------------------------------------------------------------------
            生成新消息,绝大多数属性都是原有消息的。
            MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
            //设置新主题
            msgInner.setTopic(newTopic);
            msgInner.setBody(msgExt.getBody());
            msgInner.setFlag(msgExt.getFlag());
            MessageAccessor.setProperties(msgInner, msgExt.getProperties());
            msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
            msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));
    
            msgInner.setQueueId(queueIdInt);
            msgInner.setSysFlag(msgExt.getSysFlag());
            msgInner.setBornTimestamp(msgExt.getBornTimestamp());
            msgInner.setBornHost(msgExt.getBornHost());
            msgInner.setStoreHost(msgExt.getStoreHost());
            msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
    
            String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
            MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
    --------------------------------------------------------------------------------------------------------------------------------------------
            对新生成的消息进行存储。存储后返回响应
            CompletableFuture<PutMessageResult> putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
            return putMessageResult.thenApply((r) -> {
                if (r != null) {
                    switch (r.getPutMessageStatus()) {
                        case PUT_OK:
                            String backTopic = msgExt.getTopic();
                            String correctTopic = msgExt.getProperty(MessageConst.PROPERTY_RETRY_TOPIC);
                            if (correctTopic != null) {
                                backTopic = correctTopic;
                            }
                            this.brokerController.getBrokerStatsManager().incSendBackNums(requestHeader.getGroup(), backTopic);
                            response.setCode(ResponseCode.SUCCESS);
                            response.setRemark(null);
                            return response;
                        default:
                            break;
                    }
                    response.setCode(ResponseCode.SYSTEM_ERROR);
                    response.setRemark(r.getPutMessageStatus().name());
                    return response;
                }
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("putMessageResult is null");
                return response;
            });
        }
    }
    

    从上面的代码来看,消息如果消费失败,那么在mq中,是不会将一个消息进行重复消费的(除非broker挂了,消费者无法Sendback,则只能对一个消息多消费几次),其实是采取不断生成新消息的方式来(大多数内容采取复制,新消息有自己的msgId,以及不同的topic,但是会将一些原有的信息保留,比如原来的topic),来进行重复消费。

    topic最后分为几类。

    1.自定义topic

    这里没太特殊,其实就是我们自定义的topic

    2.重试主题

    %RETRT%前缀 + GROUP 比如一个消费者的GROUP为group1。那么重试主题就为%RETRY%group1
    如果消息消费失败,且允许重试消费的话,就会复制一个新消息,保存到%RETRY%GROUP中。
    消费者一开始会默认创建重试的主题的订阅信息,用来保证对重试消息的消费。
    源码如下

    public class DefaultMQPushConsumerImpl implements MQConsumerInner {
        private void copySubscription() throws MQClientException {
            try {
                Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
                if (sub != null) {
                    for (final Map.Entry<String, String> entry : sub.entrySet()) {
                        final String topic = entry.getKey();
                        final String subString = entry.getValue();
                        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                            topic, subString);
                        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                    }
                }
    
                if (null == this.messageListenerInner) {
                    this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
                }
    
                switch (this.defaultMQPushConsumer.getMessageModel()) {
                    case BROADCASTING:
                        break;
                    case CLUSTERING:
                        final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                        集群模式下,需要设置重试主题
                        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                            retryTopic, SubscriptionData.SUB_ALL);
                        this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                        break;
                    default:
                        break;
                }
            } catch (Exception e) {
                throw new MQClientException("subscription exception", e);
            }
        }
    }
    
    3.调度主题

    在重复次数没超的情况下,会根据重复次数设置延时级别,如果延时级别越高的话,那么延时的时间则越长,放入SCHEDULE_TOPIC_XXXX队列。在保存之前,根据消息的延时级别,判断是否要修改主题。

    public class CommitLog {
        public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
            msg.setStoreTimestamp(System.currentTimeMillis());
            msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
            AppendMessageResult result = null;
    
            StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    
            String topic = msg.getTopic();
            int queueId = msg.getQueueId();
    
            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    
                if (msg.getDelayTimeLevel() > 0) {
                    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                    }
                    修改主题为"SCHEDULE_TOPIC_XXXX"
                    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
                    备份原有的topic以及队列id,用于后续转发
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                    更新主题
                    msg.setTopic(topic);
                    msg.setQueueId(queueId);
                return putMessageResult;
            });
        }
    }
    

    SCHEDULE_TOPIC_XXXX是broker内部的一个主题,对于消费者生产者而言都是不可见的,仅仅用于消息的延时生成,会有定时器去将延时消息消费,将延时消息转发到原来对应的队列。
    举个例子,某消息被消费失败了,随后进行重试消费,topic改为%RETRY%group1,随后发现需要延时,在保存消息时候,则将原来的主题(%RETRY%group1)以及队列id等相关信息保存,将消息的主题改为SCHEDULE_TOPIC_XXXX。最后进行保存。
    而这个内部的主题SCHEDULE_TOPIC_XXXX是如何被消费的,其实就是有定时器去把主题里面的消息读出来,然后根据那些消息原有的消息以及队列,将消息放到对应的队列中去。
    说白了这个叫SCHEDULE_TOPIC_XXXX的主题,其实就是用于将重试的消息延迟放入到对应的队列中。而它的作用相当于,先保存,后转发。

    4.死信主题

    如果重复次数超过配置的最大重试次数,则进入死信队列。
    %DLQ%+消费组的group,死信队列的主题为%DLQ%+消费者的group。
    这个是需要人工去干预的。

    相关文章

      网友评论

          本文标题:(十)ack确认

          本文链接:https://www.haomeiwen.com/subject/mhlrmltx.html