美文网首页
(十一)重试消费---延时调度机制

(十一)重试消费---延时调度机制

作者: guessguess | 来源:发表于2021-07-27 17:03 被阅读0次

    在当消息消费失败,返回给broker的时候,有一个延时级别,然后在保存消息之前,会根据延时级别将主题修改。

    public class CommitLog {
        public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
            msg.setStoreTimestamp(System.currentTimeMillis());
            msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
            AppendMessageResult result = null;
    
            StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    
            String topic = msg.getTopic();
            int queueId = msg.getQueueId();
    
            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
    
                if (msg.getDelayTimeLevel() > 0) {
                    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                    }
                    修改主题为"SCHEDULE_TOPIC_XXXX"
                    topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
                    备份原有的topic以及队列id,用于后续转发
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                    更新主题
                    msg.setTopic(topic);
                    msg.setQueueId(queueId);
                return putMessageResult;
            });
        }
    }
    

    那么这个调度主题SCHEDULE_TOPIC_XXXX里面的消息是如何被消费的。这个主题其实是broker内部的主题,那么生产跟消费,必然都是broker内部去实现的。

    下面先来看看一个接口MessageStore

    MessageStore

    这个接口定义了如何从commitLog中读取数据,写入数据,以及查询数据。

    public interface MessageStore {
        boolean load();
        void start() throws Exception;
        void shutdown();
        void destroy();
        default CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
            return CompletableFuture.completedFuture(putMessage(msg));
        }
        default CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch) {
            return CompletableFuture.completedFuture(putMessages(messageExtBatch));
        }
        PutMessageResult putMessage(final MessageExtBrokerInner msg);
        PutMessageResult putMessages(final MessageExtBatch messageExtBatch);
        GetMessageResult getMessage(final String group, final String topic, final int queueId,
            final long offset, final int maxMsgNums, final MessageFilter messageFilter);
        long getMaxOffsetInQueue(final String topic, final int queueId);
        long getMinOffsetInQueue(final String topic, final int queueId);
        long getCommitLogOffsetInQueue(final String topic, final int queueId, final long consumeQueueOffset);
        long getOffsetInQueueByTime(final String topic, final int queueId, final long timestamp);
        MessageExt lookMessageByOffset(final long commitLogOffset);
        SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset);
        SelectMappedBufferResult selectOneMessageByOffset(final long commitLogOffset, final int msgSize);
        String getRunningDataInfo();
        HashMap<String, String> getRuntimeInfo();
        long getMaxPhyOffset();
        long getMinPhyOffset();
        long getEarliestMessageTime(final String topic, final int queueId);
        long getEarliestMessageTime();
        long getMessageStoreTimeStamp(final String topic, final int queueId, final long consumeQueueOffset);
        long getMessageTotalInQueue(final String topic, final int queueId);
        SelectMappedBufferResult getCommitLogData(final long offset);
        boolean appendToCommitLog(final long startOffset, final byte[] data);
        void executeDeleteFilesManually();
        QueryMessageResult queryMessage(final String topic, final String key, final int maxNum, final long begin,
            final long end);
        void updateHaMasterAddress(final String newAddr);
        long slaveFallBehindMuch();
        long now();
        int cleanUnusedTopic(final Set<String> topics);
        void cleanExpiredConsumerQueue();
        boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset);
        long dispatchBehindBytes();
        long flush();
        boolean resetWriteOffset(long phyOffset);
        long getConfirmOffset();
        void setConfirmOffset(long phyOffset);
        boolean isOSPageCacheBusy();
        long lockTimeMills();
        boolean isTransientStorePoolDeficient();
        LinkedList<CommitLogDispatcher> getDispatcherList();
        ConsumeQueue getConsumeQueue(String topic, int queueId);
        BrokerStatsManager getBrokerStatsManager();
        这个方法是延时调度的核心
        void handleScheduleMessageService(BrokerRole brokerRole);
    }
    

    简单看看接口定义的方法,就大概知道这个接口是做什么用的。就是用于跟commitLog交互的。那么下面看看其对于的实现类DefaultMessageStore.

    DefaultMessageStore

    对于handleScheduleMessageService的实现

    public class DefaultMessageStore implements MessageStore {
        private final ScheduleMessageService scheduleMessageService;
        @Override
        public void handleScheduleMessageService(final BrokerRole brokerRole) {
            if (this.scheduleMessageService != null) {
                if (brokerRole == BrokerRole.SLAVE) {
                    this.scheduleMessageService.shutdown();
                } else {
                    为master才可以进行延时调度
                    this.scheduleMessageService.start();
                }
            }
        }
    }
    

    下面直接看看如何Start

    public class ScheduleMessageService extends ConfigManager {
        启动的标记
        private final AtomicBoolean started = new AtomicBoolean(false);
        定时器
        private Timer timer;
        延时级别,以及对应的延时时间
        private final ConcurrentMap<Integer /* level */, Long/* delay timeMillis */> delayLevelTable =
            new ConcurrentHashMap<Integer, Long>(32);
        延时级别对应的队列的偏移量
        private final ConcurrentMap<Integer /* level */, Long/* offset */> offsetTable =
            new ConcurrentHashMap<Integer, Long>(32);
    
    
        public void start() {
            避免重复启动
            if (started.compareAndSet(false, true)) {
                后台启动一个守护线程
                this.timer = new Timer("ScheduleMessageTimerThread", true);
                遍历延时级别的表,每一个延时级别对应不同的延时时间
                for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                    Integer level = entry.getKey();
                    Long timeDelay = entry.getValue();
                    Long offset = this.offsetTable.get(level);
                    if (null == offset) {
                        offset = 0L;
                    }
                    
                    if (timeDelay != null) {
                        根据延时级别以及偏移量创建定时任务,第一次延时执行
                        this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                    }
                }
    
                this.timer.scheduleAtFixedRate(new TimerTask() {
    
                    @Override
                    public void run() {
                        try {
                            定时进行持久化
                            if (started.get()) ScheduleMessageService.this.persist();
                        } catch (Throwable e) {
                            log.error("scheduleAtFixedRate flush exception", e);
                        }
                    }
                }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
            }
        }
    }
    

    从上面代码来看,就是每一个延时级别创建一个定时任务。执行内容为DeliverDelayedMessageTimerTask
    随后再创建一个持久化的定时任务。

    如何根据延时级别进行延时处理

    public class ScheduleMessageService extends ConfigManager {
        class DeliverDelayedMessageTimerTask extends TimerTask {
            private final int delayLevel;
            private final long offset;
    
            public DeliverDelayedMessageTimerTask(int delayLevel, long offset) {
                this.delayLevel = delayLevel;
                this.offset = offset;
            }
    
            @Override
            public void run() {
                try {
                    if (isStarted()) {
                        实现的核心
                        this.executeOnTimeup();
                    }
                } catch (Exception e) {
                    log.error("ScheduleMessageService, executeOnTimeup exception", e);
                    ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                        this.delayLevel, this.offset), DELAY_FOR_A_PERIOD);
                }
            }
        }
    }
    

    从上面代码看 实现的核心在于executeOnTimeup方法。
    接下来看看是如何实现的。

    public class ScheduleMessageService extends ConfigManager {
        class DeliverDelayedMessageTimerTask extends TimerTask {
            private final int delayLevel;
            private final long offset;
        class DeliverDelayedMessageTimerTask extends TimerTask {
            public void executeOnTimeup() {
                1.根据调度主题,以及延时级别找到对应的队列。(如果不存在就创建队列),如果创建后队列还为空,啥也不做。
                ConsumeQueue cq =
                    ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
                        delayLevel2QueueId(delayLevel));
    
                long failScheduleOffset = offset;
    
                if (cq != null) {
                    SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                    if (bufferCQ != null) {
                        try {
                            long nextOffset = offset;
                            int i = 0;
                            ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                            for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                                long offsetPy = bufferCQ.getByteBuffer().getLong();
                                int sizePy = bufferCQ.getByteBuffer().getInt();
                                long tagsCode = bufferCQ.getByteBuffer().getLong();
    
                                if (cq.isExtAddr(tagsCode)) {
                                    if (cq.getExt(tagsCode, cqExtUnit)) {
                                        tagsCode = cqExtUnit.getTagsCode();
                                    } else {
                                        //can't find ext content.So re compute tags code.
                                        log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
                                            tagsCode, offsetPy, sizePy);
                                        long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
                                        tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
                                    }
                                }
    
                                long now = System.currentTimeMillis();
                                long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
    
                                nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
    
                                long countdown = deliverTimestamp - now;
    
                                if (countdown <= 0) {
                                    根据偏移量取出消息
                                    MessageExt msgExt =
                                        ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
                                            offsetPy, sizePy);
    
                                    if (msgExt != null) {
                                        try {
                                            2.将消息还原
                                            MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                            if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
                                                log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
                                                        msgInner.getTopic(), msgInner);
                                                continue;
                                            }
                                            3.将消息写入
                                            PutMessageResult putMessageResult =
                                                ScheduleMessageService.this.writeMessageStore
                                                    .putMessage(msgInner);
    
                                            if (putMessageResult != null
                                                && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
                                                continue;
                                            } else {
                                                出现异常,延时调度
                                                ScheduleMessageService.this.timer.schedule(
                                                    new DeliverDelayedMessageTimerTask(this.delayLevel,
                                                        nextOffset), DELAY_FOR_A_PERIOD);
                                                更新延时级别的偏移量
                                                ScheduleMessageService.this.updateOffset(this.delayLevel,
                                                    nextOffset);
                                                结束
                                                return;
                                            }
                                        } catch (Exception e) {
                                            log.error(
                                                "ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
                                                    + msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
                                                    + offsetPy + ",sizePy=" + sizePy, e);
                                        }
                                    }
                                } else {
                                    ScheduleMessageService.this.timer.schedule(
                                        new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),
                                        countdown);
                                    ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                    return;
                                }
                            } 
    
                            nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                            ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                                this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                            ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                            return;
                        } finally {
    
                            bufferCQ.release();
                        }
                    } // end of if (bufferCQ != null)
                    else {
    
                        long cqMinOffset = cq.getMinOffsetInQueue();
                        if (offset < cqMinOffset) {
                            failScheduleOffset = cqMinOffset;
                            log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                                + cqMinOffset + ", queueId=" + cq.getQueueId());
                        }
                    }
                } // end of if (cq != null)
    
                ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                    failScheduleOffset), DELAY_FOR_A_WHILE);
            }
        }
    }
    

    上面的代码比较复杂,但是核心还是在于从队列中取出消息。然后进行还原,最后写入。

    消息还原

    还原的方法如下

    public class ScheduleMessageService extends ConfigManager {
        class DeliverDelayedMessageTimerTask extends TimerTask {
            private final int delayLevel;
            private final long offset;
            private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
                MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
                msgInner.setBody(msgExt.getBody());
                msgInner.setFlag(msgExt.getFlag());
                MessageAccessor.setProperties(msgInner, msgExt.getProperties());
    
                TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
                long tagsCodeValue =
                    MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
                msgInner.setTagsCode(tagsCodeValue);
                msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
    
                msgInner.setSysFlag(msgExt.getSysFlag());
                msgInner.setBornTimestamp(msgExt.getBornTimestamp());
                msgInner.setBornHost(msgExt.getBornHost());
                msgInner.setStoreHost(msgExt.getStoreHost());
                msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
    
                msgInner.setWaitStoreMsgOK(false);
                MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
                将主题还原,真实主题以及队列id被保存在property中
    ----------------------------------------------------------------------------------------------------------------------------------------------
                msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
                String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
    ----------------------------------------------------------------------------------------------------------------------------------------------
                int queueId = Integer.parseInt(queueIdStr);
                msgInner.setQueueId(queueId);
    
                return msgInner;
            }
    }
    

    延时调度的机制就是,给每个延时级别分配一个定时任务,用于将每个延时级别对应的队列中的消息进行消费,这个消费其实就是将消息还原,最后写入到真实TOPIC对应的队列中去。

    相关文章

      网友评论

          本文标题:(十一)重试消费---延时调度机制

          本文链接:https://www.haomeiwen.com/subject/bgqgmltx.html