美文网首页
store模块阅读16:ConsumeQueue

store模块阅读16:ConsumeQueue

作者: 赤子心_d709 | 来源:发表于2017-11-02 17:13 被阅读191次

    说明

    consume queue是消息的逻辑队列
    相当于字典的目录,用来指定消息在物理文件commit log上的位置。
    默认存储路径为{user.home}/store/consumequeue/{topic}/{queueId}
    管理mappedFile,每个file默认记录30w条数据,每条数据20个字节(long offset, int size, long tagsCode),即600w个字节大小
    其中offset是commitLog中的物理偏移,size是消息在commitLog中的大小,tagsCode应该就是tag
    有效数据满足 offset >= 0 && size > 0(无效数据相反,一个文件没写到的地方,byteBuffer读取,都是0)

    属性

        private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    
        public static final int CQ_STORE_UNIT_SIZE = 20;//每个存储单元大小
        private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
    
        private final DefaultMessageStore defaultMessageStore;
    
        private final MappedFileQueue mappedFileQueue;
        private final String topic;
        private final int queueId;
        private final ByteBuffer byteBufferIndex;
    
        private final String storePath;//存储路径,默认{user.home}/store/consumequeue/{topic}/{queueId}
        private final int mappedFileSize;//文件大小,默认30w * 20字节 = 600w字节
        private long maxPhysicOffset = -1;//对应commitLog的最大物理偏移
        private volatile long minLogicOffset = 0;//最小的逻辑偏移
        private ConsumeQueueExt consumeQueueExt = null;
    

    看注释就行

    方法

    先直接贴所有的代码好了,都有注释

    //注意mappedFileQueue的目录,以及ext是否开启即可
        public ConsumeQueue(
            final String topic,
            final int queueId,
            final String storePath,
            final int mappedFileSize,
            final DefaultMessageStore defaultMessageStore) {
            this.storePath = storePath;
            this.mappedFileSize = mappedFileSize;
            this.defaultMessageStore = defaultMessageStore;
    
            this.topic = topic;
            this.queueId = queueId;
    
            String queueDir = this.storePath
                + File.separator + topic
                + File.separator + queueId;
    
            this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
    
            this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
    
            if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {//是否开启ConsumeQueueExt,默认不开启
                this.consumeQueueExt = new ConsumeQueueExt(
                    topic,
                    queueId,
                    StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
                    defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
                    defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
                );
            }
        }
    
        /**
         * 加载mappedFileQueue文件到内存
         * 如果拓展文件可读,接着加载ConsumeQueueExt
         */
        public boolean load() {
            boolean result = this.mappedFileQueue.load();
            log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
            if (isExtReadEnable()) {//如果consumeQueueExt文件存在,那么就可读
                result &= this.consumeQueueExt.load();
            }
            return result;
        }
    
        /**
         * 用于设定maxPhysicOffset以及更新最后三个mappedFile的flush,commitPosition
         * 1.从前往后读取最后三个mappedFile,依次读取记录,如果有效,更新maxPhysicOffset以及maxExtAddr
         * 2.出现第一个无效记录的位置记为processOffset,设置flush,commitPosition,清除mappedFileQueue之后的记录
         * 3.如果开启ext,清除maxExtAddr之后的记录
         */
        public void recover() {
            final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
            if (!mappedFiles.isEmpty()) {
    
                int index = mappedFiles.size() - 3;//从倒数第三个文件开始恢复
                if (index < 0)
                    index = 0;
    
                int mappedFileSizeLogics = this.mappedFileSize;
                MappedFile mappedFile = mappedFiles.get(index);
                ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
                long processOffset = mappedFile.getFileFromOffset();//文件名,即绝对偏移
                long mappedFileOffset = 0;
                long maxExtAddr = 1;
                while (true) {
                    for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
                        long offset = byteBuffer.getLong();
                        int size = byteBuffer.getInt();
                        long tagsCode = byteBuffer.getLong();
    
                        if (offset >= 0 && size > 0) {//有效数据
                            mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
                            this.maxPhysicOffset = offset;
                            if (isExtAddr(tagsCode)) {
                                maxExtAddr = tagsCode;
                            }
                        } else {//无效数据
                            log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "
                                + offset + " " + size + " " + tagsCode);
                            break;
                        }
                    }
    
                    if (mappedFileOffset == mappedFileSizeLogics) {//读到末尾
                        index++;
                        if (index >= mappedFiles.size()) {//恢复完了
    
                            log.info("recover last consume queue file over, last maped file "
                                + mappedFile.getFileName());
                            break;
                        } else {//获取下一个mappedFile
                            mappedFile = mappedFiles.get(index);
                            byteBuffer = mappedFile.sliceByteBuffer();
                            processOffset = mappedFile.getFileFromOffset();
                            mappedFileOffset = 0;
                            log.info("recover next consume queue file, " + mappedFile.getFileName());
                        }
                    } else {//最后一个文件可能没到mappedFileSizeLogics
                        log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
                            + (processOffset + mappedFileOffset));
                        break;
                    }
                }
    
                processOffset += mappedFileOffset;//有效绝对偏移
                this.mappedFileQueue.setFlushedWhere(processOffset);
                this.mappedFileQueue.setCommittedWhere(processOffset);
                this.mappedFileQueue.truncateDirtyFiles(processOffset);//删除后续记录
    
                if (isExtReadEnable()) {
                    this.consumeQueueExt.recover();
                    log.info("Truncate consume queue extend file by max {}", maxExtAddr);
                    this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);//ext恢复到maxExtAddr
                }
            }
        }
    
        /**
         * 找到消息发送时间最接近timestamp逻辑队列的offset
         * @param timestamp
         * @return
         */
        public long getOffsetInQueueByTime(final long timestamp) {
            MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);//找到timestamp对应的mappedFile
            if (mappedFile != null) {
                long offset = 0;
                int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
                int high = 0;
                int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
                long leftIndexValue = -1L, rightIndexValue = -1L;
                long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();//从commitLog找到最小物理偏移
                SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
                if (null != sbr) {
                    ByteBuffer byteBuffer = sbr.getByteBuffer();
                    high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;//最后一个有效记录
                    try {
                        while (high >= low) {
                            midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
                            byteBuffer.position(midOffset);
                            long phyOffset = byteBuffer.getLong();
                            int size = byteBuffer.getInt();
                            if (phyOffset < minPhysicOffset) {//中间记录在commitLog中已经失效了
                                low = midOffset + CQ_STORE_UNIT_SIZE;//low变化
                                leftOffset = midOffset;
                                continue;
                            }
    
                            long storeTime =
                                this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);//利用commitLog,用参数物理偏移,大小找到该消息的存储时间
                            if (storeTime < 0) {
                                return 0;
                            } else if (storeTime == timestamp) {
                                targetOffset = midOffset;
                                break;
                            } else if (storeTime > timestamp) {//目标值旧了,更新high
                                high = midOffset - CQ_STORE_UNIT_SIZE;
                                rightOffset = midOffset;
                                rightIndexValue = storeTime;
                            } else {
                                low = midOffset + CQ_STORE_UNIT_SIZE;//目标值太新了,更新low
                                leftOffset = midOffset;
                                leftIndexValue = storeTime;
                            }
                        }
    
                        if (targetOffset != -1) {
    
                            offset = targetOffset;
                        } else {
                            if (leftIndexValue == -1) {//二分中值一直在右边
    
                                offset = rightOffset;
                            } else if (rightIndexValue == -1) {//一直在左边
    
                                offset = leftOffset;
                            } else {//左右两个最近的看谁更近
                                offset =
                                    Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
                                        - rightIndexValue) ? rightOffset : leftOffset;
                            }
                        }
    
                        return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
                    } finally {
                        sbr.release();
                    }
                }
            }
            return 0;
        }
    
        /**
         * 删除phyOffet之后的脏文件(包括同mappedFile之后记录,通过修改wrotePosition保证)
         * 如果开启ext,找到maxExtAddr,把ext的脏文件也删除掉
         *
         * 1:依次处理最后一个,倒数第二个MappedFile。。。
         * 2.每个文件从第一个数据块开始解析,如果超过phyOffet就,就删掉该文件
         * 3.否则该文件继续遍历后续记录,不断修改wrote,commit和flushPosition,直到遍历结束或者数据块大小为空则返回
         * 4.如果所有mappedFile都删完了,再truncateByMaxAddress
         *
         * 这里其实return的地方都要truncateByMaxAddress的?要不然更新maxExtAddr干啥
         */
        public void truncateDirtyLogicFiles(long phyOffet) {
    
            int logicFileSize = this.mappedFileSize;
    
            this.maxPhysicOffset = phyOffet - 1;
            long maxExtAddr = 1;
            while (true) {
                MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();//反复获取当时最后一个文件(最后一个删掉了,就依次倒数第二,第三个)
                if (mappedFile != null) {
                    ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
    
                    mappedFile.setWrotePosition(0);
                    mappedFile.setCommittedPosition(0);
                    mappedFile.setFlushedPosition(0);
    
                    for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
                        long offset = byteBuffer.getLong();
                        int size = byteBuffer.getInt();
                        long tagsCode = byteBuffer.getLong();
    
                        if (0 == i) {//mappedFile开头
                            if (offset >= phyOffet) {//开头offset比要求要大
                                this.mappedFileQueue.deleteLastMappedFile();//删除当前最后一个mappedFile
                                break;
                            } else {//开头< 指定phyOffet
                                int pos = i + CQ_STORE_UNIT_SIZE;
                                mappedFile.setWrotePosition(pos);
                                mappedFile.setCommittedPosition(pos);
                                mappedFile.setFlushedPosition(pos);
                                this.maxPhysicOffset = offset;//更新maxPhysicOffset
                                // This maybe not take effect, when not every consume queue has extend file.
                                if (isExtAddr(tagsCode)) {
                                    maxExtAddr = tagsCode;//有ext记录就更新ext记录
                                }
                            }
                        } else {
    
                            if (offset >= 0 && size > 0) {//有效记录(如fillBlank时候,offset会写0)
    
                                if (offset >= phyOffet) {//开头offset比要求要大,返回
                                    return;
                                }
    
                                int pos = i + CQ_STORE_UNIT_SIZE;
                                mappedFile.setWrotePosition(pos);
                                mappedFile.setCommittedPosition(pos);
                                mappedFile.setFlushedPosition(pos);
                                this.maxPhysicOffset = offset;//更新maxPhysicOffset
                                if (isExtAddr(tagsCode)) {
                                    maxExtAddr = tagsCode;//有ext记录就更新ext记录
                                }
    
                                if (pos == logicFileSize) {//文件遍历结束,返回(再上一个结束,偏移肯定更小)
                                    return;
                                }
                            } else {
                                return;
                            }
                        }
                    }
                } else {//删完了,没有最后的mappedFile了
                    break;
                }
            }
    
            if (isExtReadEnable()) {
                this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
            }
        }
    
        /**
         * 获得最后偏移:
         * 1.获得最后一个mappedFile
         * 2.找到写的最后一条记录(wrotePosition - 20)
         * 3.依次向后读每一条记录,如果有效,读出offset和size,记录lastOffset = offset + size
         * 4.返回lastOffset
         *
         * ???为什么已经是最后一条记录了,还要向后遍历来确认
         */
        public long getLastOffset() {
            long lastOffset = -1;
    
            int logicFileSize = this.mappedFileSize;
    
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            if (mappedFile != null) {
    
                int position = mappedFile.getWrotePosition() - CQ_STORE_UNIT_SIZE;//最后一个记录写的位置
                if (position < 0)
                    position = 0;
    
                ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
                byteBuffer.position(position);
                for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {//遍历最多 logicFileSize/CQ_STORE_UNIT_SIZE 次
                    long offset = byteBuffer.getLong();
                    int size = byteBuffer.getInt();
                    byteBuffer.getLong();
    
                    if (offset >= 0 && size > 0) {//有效数据
                        lastOffset = offset + size;
                    } else {//无效数据
                        break;
                    }
                }
            }
    
            return lastOffset;
        }
    
        //自己mappedFileQueue flush,如果有ext,也一起flush
        public boolean flush(final int flushLeastPages) {
            boolean result = this.mappedFileQueue.flush(flushLeastPages);
            if (isExtReadEnable()) {
                result = result & this.consumeQueueExt.flush(flushLeastPages);
            }
    
            return result;
        }
    
        /**
         * 找到所有mappedFile中最后一条记录的offset即maxOffset < 参数offset的然后删掉
         * @param offset
         * @return
         */
        public int deleteExpiredFile(long offset) {
            int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
            this.correctMinOffset(offset);
            return cnt;
        }
    
        /**
         * 修正minLogicOffset,如果ext开启了,把旧的mappedFile都删掉
         * 找到第一个mappedFile,遍历每一条记录,得到记录的offsetPy 以及 tagsCode
         * 如果offsetPy >= phyMinOffset, 更新minLogicOffset = 该条记录的offset
         * 此时tagsCode如果是ext的就记录minExtAddr,用于清除ext的mappedFile
         *
         * 为什么只清除第一个文件:
         * 因为minLogicOffset只会和第一个mappedFile有关
         */
        public void correctMinOffset(long phyMinOffset) {
            MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
            long minExtAddr = 1;
            if (mappedFile != null) {
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer(0);
                if (result != null) {
                    try {
                        for (int i = 0; i < result.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {//遍历每一条记录
                            long offsetPy = result.getByteBuffer().getLong();
                            result.getByteBuffer().getInt();
                            long tagsCode = result.getByteBuffer().getLong();
    
                            if (offsetPy >= phyMinOffset) {
                                this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;//该条记录开始的offset
                                log.info("Compute logical min offset: {}, topic: {}, queueId: {}",
                                    this.getMinOffsetInQueue(), this.topic, this.queueId);
                                // This maybe not take effect, when not every consume queue has extend file.
                                if (isExtAddr(tagsCode)) {
                                    minExtAddr = tagsCode;
                                }
                                break;
                            }
                        }
                    } catch (Exception e) {
                        log.error("Exception thrown when correctMinOffset", e);
                    } finally {
                        result.release();
                    }
                }
            }
    
            if (isExtReadEnable()) {
                this.consumeQueueExt.truncateByMinAddress(minExtAddr);
            }
        }
    
        //最小的偏移的下标
        public long getMinOffsetInQueue() {
            return this.minLogicOffset / CQ_STORE_UNIT_SIZE;
        }
    
        /**
         * putMessagePositionInfo的封装函数:
         * 1.重试30次,将DispatchRequest拆分
         * 2.如果开启了ext,那么先向consumeQueueExt放置CqExtUnit记录, 且更新tagsCode
         * 3.调用putMessagePositionInfo
         * @param request
         */
        public void putMessagePositionInfoWrapper(DispatchRequest request) {
            final int maxRetries = 30;
            boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();//此时是否可写
            for (int i = 0; i < maxRetries && canWrite; i++) {
                long tagsCode = request.getTagsCode();
                if (isExtWriteEnable()) {//可以写到consumeQueueExt文件
                    ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                    cqExtUnit.setFilterBitMap(request.getBitMap());
                    cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
                    cqExtUnit.setTagsCode(request.getTagsCode());
    
                    long extAddr = this.consumeQueueExt.put(cqExtUnit);//consumeQueueExt添加CqExtUnit记录
                    if (isExtAddr(extAddr)) {//返回的addr是编码过的
                        tagsCode = extAddr;//更新tagsCode
                    } else {
                        log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
                            topic, queueId, request.getCommitLogOffset());
                    }
                }
                boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                    request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());//放置消息
                if (result) {
                    this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());//更新StoreCheckPoint记录
                    return;
                } else {
                    // XXX: warn and notify me
                    log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                        + " failed, retry " + i + " times");
    
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        log.warn("", e);
                    }
                }
            }
    
            // XXX: warn and notify me
            log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
            this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
        }
    
        /**
         * 存放消息位置信息
         * @param offset : CommitLogOffset
         * @param size
         * @param tagsCode
         * @param cqOffset : ConsumeQueueOffset,下标
         * @return
         *
         * 1.验证offset 与 maxPhysicOffset关系,如果<=代表已经存放过
         * 2.构建byteBufferIndex,放置20字节信息
         * 3.获取最后一个mappedFile(没有就创建)
         * 4.如果是第一个创建的mappedFile,设置minLogicOffset,FlushedWhere,CommittedWhere
         */
        private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
            final long cqOffset) {
    
            if (offset <= this.maxPhysicOffset) {//不得超过物理最大偏移
                return true;
            }
    
            this.byteBufferIndex.flip();
            this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);//mappedFile每一个单元为20字节
            //存放信息
            this.byteBufferIndex.putLong(offset);
            this.byteBufferIndex.putInt(size);
            this.byteBufferIndex.putLong(tagsCode);
    
            final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;//根据下标 获得 逻辑偏移(相对偏移)
    
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);//获取最后一个文件
            if (mappedFile != null) {
    
                if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                    this.minLogicOffset = expectLogicOffset;//mappedFileQueue创建的第一个file,记录最小逻辑偏移
                    this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                    this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                    this.fillPreBlank(mappedFile, expectLogicOffset);//第一个mappedFile写入前置的blank信息
                    log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                        + mappedFile.getWrotePosition());
                }
    
                if (cqOffset != 0) {
                    long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
                    if (expectLogicOffset != currentLogicOffset) {//实际该写的位置和理论不一样
                        LOG_ERROR.warn(
                            "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                            expectLogicOffset,
                            currentLogicOffset,
                            this.topic,
                            this.queueId,
                            expectLogicOffset - currentLogicOffset
                        );
                    }
                }
                this.maxPhysicOffset = offset;//对应commitLog的最大偏移
                return mappedFile.appendMessage(this.byteBufferIndex.array());//mappedFile添加消息
            }
            return false;
        }
    
        /**
         * 在mappedFile填充前置的blank数据, 一直到untilWhere这个位置
         * @param mappedFile 此时的最后一个mappedFile
         * @param untilWhere
         */
        private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
            ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
            byteBuffer.putLong(0L);
            byteBuffer.putInt(Integer.MAX_VALUE);
            byteBuffer.putLong(0L);
            //空数据标识是 (0,Integer.MAX,0)
    
            int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
            for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
                mappedFile.appendMessage(byteBuffer.array());
            }
        }
    
        /**
         * 根据consumerQueue的物理下标,找到对应的mappedFile以及逻辑地址
         * 获取该mappedFile逻辑地址之后的所有可读部分
         *
         * @param startIndex 消息单元的序号
         */
        public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
            int mappedFileSize = this.mappedFileSize;
            long offset = startIndex * CQ_STORE_UNIT_SIZE;//逻辑位置
            if (offset >= this.getMinLogicOffset()) {// >= minLogicOffset
                MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);//找到offset所在的mappedFile
                if (mappedFile != null) {
                    SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));//返回offset % mappedFileSize 到最大有效位置的ByteBuffer
                    return result;
                }
            }
            return null;
        }
    
        //根据offset从ext中获取对应的CqExtUnit
        public ConsumeQueueExt.CqExtUnit getExt(final long offset) {
            if (isExtReadEnable()) {
                return this.consumeQueueExt.get(offset);
            }
            return null;
        }
        //根据offset从ext中获取对应位置,获取相关信息赋值给
        public boolean getExt(final long offset, ConsumeQueueExt.CqExtUnit cqExtUnit) {
            if (isExtReadEnable()) {
                return this.consumeQueueExt.get(offset, cqExtUnit);
            }
            return false;
        }
    
        public long getMinLogicOffset() {
            return minLogicOffset;
        }
    
        //没用到
        public void setMinLogicOffset(long minLogicOffset) {
            this.minLogicOffset = minLogicOffset;
        }
    
        /**
         * 获取指定消息序号所在mappedFile的下一个文件的起始消息序号(起始偏移量/20)
         */
        public long rollNextFile(final long index) {
            int mappedFileSize = this.mappedFileSize;
            int totalUnitsInFile = mappedFileSize / CQ_STORE_UNIT_SIZE;
            return index + totalUnitsInFile - index % totalUnitsInFile;
        }
    
        public String getTopic() {
            return topic;
        }
    
        public int getQueueId() {
            return queueId;
        }
    
        public long getMaxPhysicOffset() {
            return maxPhysicOffset;
        }
    
        //没用到
        public void setMaxPhysicOffset(long maxPhysicOffset) {
            this.maxPhysicOffset = maxPhysicOffset;
        }
    
        public void destroy() {
            this.maxPhysicOffset = -1;
            this.minLogicOffset = 0;
            this.mappedFileQueue.destroy();
            if (isExtReadEnable()) {
                this.consumeQueueExt.destroy();
            }
        }
    
        public long getMessageTotalInQueue() {
            return this.getMaxOffsetInQueue() - this.getMinOffsetInQueue();
        }
    
        public long getMaxOffsetInQueue() {
            return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
        }
    
        //自检,如果consumeQueueExt可读,也要自检
        public void checkSelf() {
            mappedFileQueue.checkSelf();
            if (isExtReadEnable()) {
                this.consumeQueueExt.checkSelf();
            }
        }
    
        //ext是否可读
        protected boolean isExtReadEnable() {
            return this.consumeQueueExt != null;
        }
    
        //ext是否可写
        protected boolean isExtWriteEnable() {
            return this.consumeQueueExt != null
                && this.defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt();
        }
    
        /**
         * Check {@code tagsCode} is address of extend file or tags code.
         *
         */
        public boolean isExtAddr(long tagsCode) {
            return ConsumeQueueExt.isExtAddr(tagsCode);
        }
    

    1.部分方法就是调用mappedFileQueue方法,以及ConsumeQueueExt的方法,如isExtAddr,checkSelf,destroy,getExt,flush,load方法不重要
    2.putMessagePositionInfoWrapper, putMessagePositionInfo ,fillPreBlank用于往ConsumeQueue里写消息
    3.recover用于恢复
    4.getOffsetInQueueByTime,getLastOffset,getIndexBuffer,rollNextFile 用于ConsumeQueue信息的获取,查询等
    5.truncateDirtyLogicFiles,deleteExpiredFile,correctMinOffset用于清除脏数据以及关键字段的维护

    思考

    index CQ_STORE_UNIT_SIZE offset三者之间的关系

    index * CQ_STORE_UNIT_SIZE = offset

    correctMinOffset为什么只处理第一个文件

    因为correctMinOffset只会和第一个文件相关

    问题 :

    tagsCode有可能是Ext有可能不是???

    这个暂时不懂,要看上层调用

    getLastOffset函数中 为什么已经是最后一条记录了,还要向后遍历来确认

    为什么需要函数fillPreBlank,不写前置标志位不行吗

    吐槽

    truncateDirtyLogicFiles 漏掉了ext的处理

    里面好几个地方直接return了,漏掉了ConsumeQueueExt的处理,应该是bug.

    代码太晦涩了

    refer

    http://blog.csdn.net/meilong_whpu/article/details/76921208
    http://blog.csdn.net/chunlongyu/article/details/54576649
    http://www.jianshu.com/p/453c6e7ff81c ConsumeQueue部分
    http://blog.csdn.net/github_38592071/article/details/71438257?locationNum=15&fps=1#

    相关文章

      网友评论

          本文标题:store模块阅读16:ConsumeQueue

          本文链接:https://www.haomeiwen.com/subject/lhlopxtx.html