美文网首页
RocketMq存储模块初始启动

RocketMq存储模块初始启动

作者: 飞哈飞 | 来源:发表于2020-11-08 00:28 被阅读0次

    DefaultMessageStore.load()->commitLog(MappedFileQueue).load
    ->loadConsumeQueue

    文件管理对象

    DefaultMessageStore:默认文件存储管理类
    CommitLog:日志消息提交文件
    ConsumeQueue:消费索引文件
    MappedFileQueue:映射文件组
    MappedFile:具体的一个个映射文件
    StoreCheckpoint:


    无标题.png

    文件加载

    CommitLog和ConsumeQueue调用自己的load方法至MappedFileQueue字段

    文件自我检查

    检查相邻文件的起始值ID是否等于一个文件的长度,统一调用MappedFileQueue.checkSelf

     public void checkSelf() {
            if (!this.mappedFiles.isEmpty()) {
                Iterator<MappedFile> iterator = mappedFiles.iterator();
                MappedFile pre = null;
                while (iterator.hasNext()) {
                    MappedFile cur = iterator.next();
    
                    if (pre != null) {
                        if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) {
                            LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}",
                                pre.getFileName(), cur.getFileName());
                        }
                    }
                    pre = cur;
                }
            }
        }
    

    系统是否正常退出

    通过判断abort文件是否存在

    private boolean isTempFileExist() {
            String fileName = StorePathConfigHelper.getAbortFile(this.messageStoreConfig.getStorePathRootDir());
            File file = new File(fileName);
            return file.exists();
        }
    

    文件重启恢复

    文件恢复总控制逻辑为调用DefaultMessageStore.recover

     private void recover(final boolean lastExitOK) {
            long maxPhyOffsetOfConsumeQueue = this.recoverConsumeQueue();
    
            if (lastExitOK) {
                this.commitLog.recoverNormally(maxPhyOffsetOfConsumeQueue);
            } else {
                this.commitLog.recoverAbnormally(maxPhyOffsetOfConsumeQueue);
            }
    
            this.recoverTopicQueueTable();
        }
    

    文件正常重启恢复

    CosumeQueue文件恢复

    从倒数第3个文件开始,恢复的逻辑即是查找最大有效偏移量,设置提交位移和刷盘位移并截断脏数据

     public void recover() {
            final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
            if (!mappedFiles.isEmpty()) {
    
                int index = mappedFiles.size() - 3;
                if (index < 0)
                    index = 0;
    
                int mappedFileSizeLogics = this.mappedFileSize;
                MappedFile mappedFile = mappedFiles.get(index);
                ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
                long processOffset = mappedFile.getFileFromOffset();
                long mappedFileOffset = 0;
                long maxExtAddr = 1;
                while (true) {
                    for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                        long offset = byteBuffer.getLong();
                        int size = byteBuffer.getInt();
                        long tagsCode = byteBuffer.getLong();
    
                        if (offset >= 0 && size > 0) {
                            mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                            this.maxPhysicOffset = offset + size;
                            if (isExtAddr(tagsCode)) {
                                maxExtAddr = tagsCode;
                            }
                        } else {
                            log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                                + offset + " " + size + " " + tagsCode);
                            break;
                        }
                    }
    
                    if (mappedFileOffset == mappedFileSizeLogics) {
                        index++;
                        if (index >= mappedFiles.size()) {
    
                            log.info("recover last consume queue file over, last mapped file "
                                + mappedFile.getFileName());
                            break;
                        } else {
                            mappedFile = mappedFiles.get(index);
                            byteBuffer = mappedFile.sliceByteBuffer();
                            processOffset = mappedFile.getFileFromOffset();
                            mappedFileOffset = 0;
                            log.info("recover next consume queue file, " + mappedFile.getFileName());
                        }
                    } else {
                        log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
                            + (processOffset + mappedFileOffset));
                        break;
                    }
                }
    
                processOffset += mappedFileOffset;
                this.mappedFileQueue.setFlushedWhere(processOffset);
                this.mappedFileQueue.setCommittedWhere(processOffset);
                this.mappedFileQueue.truncateDirtyFiles(processOffset);
    
                if (isExtReadEnable()) {
                    this.consumeQueueExt.recover();
                    log.info("Truncate consume queue extend file by max {}", maxExtAddr);
                    this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
                }
            }
        }
    

    CommitLog正常文件恢复

    类似CosumeQueue恢复逻辑由文件信息获取到有效偏移量,再由CosumeQueue得来的消息最大偏移量进行对比截断掉CommitLog里没有记录的消息

     if (!mappedFiles.isEmpty()) {
                // Began to recover from the last third file
                int index = mappedFiles.size() - 3;
                if (index < 0)
                    index = 0;
    
                MappedFile mappedFile = mappedFiles.get(index);
                ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
                long processOffset = mappedFile.getFileFromOffset();
                long mappedFileOffset = 0;
                while (true) {
                    DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);
                    int size = dispatchRequest.getMsgSize();
                    // Normal data
                    if (dispatchRequest.isSuccess() && size > 0) {
                        mappedFileOffset += size;
                    }
                    ///省略一万字
                }
    
                processOffset += mappedFileOffset;
                this.mappedFileQueue.setFlushedWhere(processOffset);
                this.mappedFileQueue.setCommittedWhere(processOffset);
                this.mappedFileQueue.truncateDirtyFiles(processOffset);
    
                // Clear ConsumeQueue redundant data
                if (maxPhyOffsetOfConsumeQueue >= processOffset) {
                    log.warn("maxPhyOffsetOfConsumeQueue({}) >= processOffset({}), truncate dirty logic files", maxPhyOffsetOfConsumeQueue, processOffset);
                    this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);
                }
            } else {
                // Commitlog case files are deleted
                log.warn("The commitlog files are deleted, and delete the consume queue files");
                this.mappedFileQueue.setFlushedWhere(0);
                this.mappedFileQueue.setCommittedWhere(0);
                this.defaultMessageStore.destroyLogics();
            }
    

    CommitLog异常文件恢复

    与正常恢复相比,会根据检查点的时间戳信息找到需要恢复的文件,再一个个取出来放到ConsumeQueue里,最后也会截断掉ConsumeQueue里CommitLog里没有记录的消息
    .........

    如何控制写入线程安全性

    通过lock锁,初始化如下

     public CommitLog(final DefaultMessageStore defaultMessageStore) {
            this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
                defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
            this.defaultMessageStore = defaultMessageStore;
    
            if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                this.flushCommitLogService = new GroupCommitService();
            } else {
                this.flushCommitLogService = new FlushRealTimeService();
            }
    
            this.commitLogService = new CommitRealTimeService();
    
            this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
            batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
                @Override
                protected MessageExtBatchEncoder initialValue() {
                    return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
                }
            };
            this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
    
        }
    

    放入消息时如下使用

     public CompletableFuture<PutMessageResult> asyncPutMessages(final MessageExtBatch messageExtBatch){
     putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
            try {
               ///此处省略一万字
            } finally {
                putMessageLock.unlock();
            }
    }
    

    相关文章

      网友评论

          本文标题:RocketMq存储模块初始启动

          本文链接:https://www.haomeiwen.com/subject/dyaqbktx.html