美文网首页程序员RocketMQ源码解读
ConsumeQueue索引文件及构建

ConsumeQueue索引文件及构建

作者: 93张先生 | 来源:发表于2020-12-06 17:54 被阅读0次

    ConsumeQueue概览

    RocketMQ是基于主题订阅模式实现消息消费,消费者关心的是主题Topic下的所有消息,同一主题的消息不连续地存储在commitlog文件中,如果直接从commitlog文件中去遍历查找订阅主题下的消息,效率极其低下,为了适应消息消费的检索需求,设计了消息消费队列文件ConsumeQueue,该文件可以看成是Commitlog关于消息消费的索引文件,consumequeue的一级目录为主题Topic,二级目录为Topic的消息队列。主要是针对每一个Topic建立的索引,方便消费者消费某个主题下的消息。


    image.png
    ConsumeQueue条目

    ConsumeQueue的每一条都是一条消息的索引,一共20字节。


    image.png

    单个ConsumeQueue文件默认包含30万个条目,每个条目20byte,单个文件的长度为30W20byte,约5.7M。与ConsumeQueue对等的是CommitLog对象。他们都有自己MappedFileQueue及MappedFile对象,他们都是使用MappedFileQueue和MappedFile对象实现消息字节数组和消息索引字节数组的落盘。ConsumeQueue没有使用AllocateMappedFileService服务来创建MappedFile文件,而是使用了MappedFile的构造方法来创建MappedFile文件。ConsumeQueue每一个文件的名称是以第一个消息条数20byte字节的大小为命名的。

    ConsumeQueue异步构造

    构建consumequeue、indexFile索引文件,通过一个ReputMessageService异步线程进行处理,构建consumequeue、indexFile索引文件的数据从commitLog的MappedFile中的ByteBuffer中获取,一条消息消息构造一个构建索引服务的DispatchRequest请求,再由ConsumeQueue服务处理DispatchRequest请求构建consumequeue的mappedFile文件。由IndexService处理请求构建indexFile索引文件,然后将各自的文件进行刷盘。

    消息消费队列ConsumeQueue索引文件是基于CommitLog文件构建的,当消息生产者提交消息存储在CommitLog的MappedFile文件中,ConsumeQueue需要及时更新,否则消息无非被及时消费,根据消息属性查找消息也会出现较大的延迟。构建ConsumeQueue的数据来源为CommitLog的MappedFile中的ByteBuffer,此时消息未必被Commit、Flush等。获取一定数量的消息后,RocketMQ根据每条消息构造一个DispatchRequest请求,开启一个新的线程处理请求,并构造ConsumeQueue的MappedFile文件,将消息写入MappedFile的FileChannel中,等待异步刷盘操作。

    构建过程

    DefaultMessageStore是消息存储服务的入口和关键API,包含消息分发构建ConsumeQueue和Index索引文件的ReputMessageService的服务。它会开启一个线程进行实时消息分发和ConsumeQueue和Index索引文件构建。

    // CommitLog  消息分发,根据 CommitLog 文件,异步构建 ConsumeQueue、IndexFile 文件
    private final ReputMessageService reputMessageService;
    
    // 开启异步构建服务
    this.reputMessageService.start();
    
    @Override
    public void run() {
        DefaultMessageStore.log.info(this.getServiceName() + " service started");
        // 异步构建ConsumeQueue、Index服务线程是否停止,一直调用doReput()方法,推送一次构建服务,线程休息1毫秒
        while (!this.isStopped()) {
            try {
                Thread.sleep(1);
                // 进行消息ConsumeQueue、Index文件异步构建
                this.doReput();
            } catch (Exception e) {
                DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    
        DefaultMessageStore.log.info(this.getServiceName() + " service end");
    }
    

    doReput()实时从CommitLog的MappedFile文件中获取需要构建的消息,然后每条消息包装成一个DispatchRequest,进行消息分发。

    /**
     * 异步构建ConsumeQueue、Index文件
     * doReput()方法在没有需要构建的offset时会停止,但调用它的地方会一直不停的调用doReput()方法,进行再次构建ConsumeQueue
     */
    private void doReput() {
        // reputFromOffset小于commitlog中mappedFile文件开始的offset,进行reputFromOffset值调整为mappedFile文件的开始offset
        if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) {
            log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
                this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
            this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
        }
        //无限循环构建,commitlog文件剩余offset需要构建
        for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {
            // 开始构建的值
            if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
                && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {
                break;
            }
            //根据需要构建的offset从MappedFile
            SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
            if (result != null) {
                try {
                    // 开始构建的offset
                    this.reputFromOffset = result.getStartOffset();
                    // 一次读取ByteBuffer中一条消息,根据每条消息的大小获取一条消息,然后取下一条消息,构建一个DispatchRequest
                    for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                        // 创造异步构建ConsumeQueue的分发请求
                        DispatchRequest dispatchRequest =
                            DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);
    
                        int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();
                        // 构建dispatchRequest成功
                        if (dispatchRequest.isSuccess()) {
                            if (size > 0) {
                                DefaultMessageStore.this.doDispatch(dispatchRequest);
    
                                if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()
                                    && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                    DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                        dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                        dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                        dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                                }
    
                                this.reputFromOffset += size;
                                readSize += size;
                                if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                    DefaultMessageStore.this.storeStatsService
                                        .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                    DefaultMessageStore.this.storeStatsService
                                        .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                        .addAndGet(dispatchRequest.getMsgSize());
                                }
                            } else if (size == 0) {
                                // 重新获取构建的offset偏移量
                                this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                                readSize = result.getSize();
                            }
                        // 构建失败
                        } else if (!dispatchRequest.isSuccess()) {
                            // 构建失败,这条数据略过,进行构建位置更新,进行下一条ConsumeQueue条目的构建
                            if (size > 0) {
                                log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                                this.reputFromOffset += size;
                            } else {
                                doNext = false;
                                // If user open the dledger pattern or the broker is master node,
                                // it will not ignore the exception and fix the reputFromOffset variable
                                if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                    DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                    log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                        this.reputFromOffset);
                                    this.reputFromOffset += result.getSize() - readSize;
                                }
                            }
                        }
                    }
                } finally {
                    // 获得需要构建的数据的释放
                    result.release();
                }
            // result为null不需要构建
            } else {
                doNext = false;
            }
        }
    }
    

    CommitLogDispatcherBuildConsumeQueue是构建ConsumeQueue请求的处理类。

    /**
     * 构建ConsumeQueue文件分发服务
     */
    class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {
    
        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                // 没有事务、事务提交
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    //处理从commit log 异步构建ConsumeQueue请求
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }
    

    putMessagePositionInfo处理具体构建请求,并创建或选择一个ConsumeQueue对象。

    // 处理从commit log 异步构建ConsumeQueue请求
    public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
        ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
        // ConsumeQueue 处理从commit log 异步构建ConsumeQueue请求
        cq.putMessagePositionInfoWrapper(dispatchRequest);
    }
    
    // 根据topic和queueId获取ConsumeQueue
    public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }
    
        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            // 新建ConsumeQueue
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }
    
        return logic;
    }
    

    putMessagePositionInfo()将消息索引信息存放到consumequeue的byteBufferIndex中,并追加到consumequeue的内存映射文件中(本操作只追加并不刷盘),ConsumeQueue的刷盘方式固定为异步刷盘模式。

    private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
        final long cqOffset) {
    
        if (offset + size <= this.maxPhysicOffset) {
            log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset);
            return true;
        }
        // 将commitlog的偏移量、消息长度、tag hash code存入byteBufferIndex
        this.byteBufferIndex.flip();
        //一条消息消费索引大小20byte
        this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
        //commitlog的偏移量
        this.byteBufferIndex.putLong(offset);
        //消息长度
        this.byteBufferIndex.putInt(size);
        // tag hash code
        this.byteBufferIndex.putLong(tagsCode);
        //开始存储consumequeue条目的物理偏移量
        final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
        // 通过构造函数获取ConsumeQueue的MappedFile对象,不是预分配的
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
        if (mappedFile != null) {
            // 如果是第一次创建,赋值一些变量
            if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
                this.minLogicOffset = expectLogicOffset;
                this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
                this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
                this.fillPreBlank(mappedFile, expectLogicOffset);
                log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                    + mappedFile.getWrotePosition());
            }
            // 并根据consumeQueueOffset计算ConsumeQueue中物理地址,将内容追加到ConsumeQueue的内存映射文件中(本操作只追加并不刷盘),ConsumeQueue的刷盘方式固定为异步刷盘
            if (cqOffset != 0) {
                long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
    
                if (expectLogicOffset < currentLogicOffset) {
                    log.warn("Build  consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                    return true;
                }
    
                if (expectLogicOffset != currentLogicOffset) {
                    LOG_ERROR.warn(
                        "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
                        expectLogicOffset,
                        currentLogicOffset,
                        this.topic,
                        this.queueId,
                        expectLogicOffset - currentLogicOffset
                    );
                }
            }
            this.maxPhysicOffset = offset + size;
            // 将内容追加到ConsumeQueue的内存映射文件中
            return mappedFile.appendMessage(this.byteBufferIndex.array());
        }
        return false;
    }
    

    消息消费查询

    AdminBrokerProcessor#getIndexBuffer()根据consumequeue的消息下标,进行消息索引条目的返回。

    // 根据consumequeue进行消息消费
    SelectMappedBufferResult result = consumeQueue.getIndexBuffer(requestHeader.getIndex());
    

    ConsumeQueue#getIndexBuffer()确定consumequeue的MappedFile,然后从MappedFile中查找索引条目。

    /**
     * 根据offset通过consumequeue查找消息
     * @param startIndex 为查找的offset值
     * @return
     */
    public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
        int mappedFileSize = this.mappedFileSize;
        // consumequeue物理offset,消息条数*20字节(消息大小)
        long offset = startIndex * CQ_STORE_UNIT_SIZE;
        if (offset >= this.getMinLogicOffset()) {
            // 确定mappedFile
            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
            if (mappedFile != null) {
                // 根据消息余数偏移量,进行ByteBuffer消息查找
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
                return result;
            }
        }
        return null;
    }
    

    MappedFile#findMappedFileByOffset()方法根据 offset 定位 MappedFile 的算法为 (int)((offset/this.mappedFileSize) - (mappedFile.getFileFromOffset()/this.MappedFileSize)),获取这个 MappedFile 在 mappedFiles 的下标,然后获取 MappedFile 文件。

    RocketMQ commitlog 日志文件有定时删除功能,所以 commitlog 文件夹下的文件个数是会发生改变的,所以下标的起始位置也会发生改变,动态确定 offset 所在文件的下标为:总文件的个数 - 现有文件个数 = 这个 offset 所在 MappedFile 文件集合中的下标值。

    /**
     *
     * Finds a mapped file by offset.
     *
     * @param offset Offset.
     * @param returnFirstOnNotFound If the mapped file is not found, then return the first one.
     * @return Mapped file or null (when not found and returnFirstOnNotFound is <code>false</code>).
     */
    public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
        try {
            MappedFile firstMappedFile = this.getFirstMappedFile();
            MappedFile lastMappedFile = this.getLastMappedFile();
            if (firstMappedFile != null && lastMappedFile != null) {
                if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                    LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                        offset,
                        firstMappedFile.getFileFromOffset(),
                        lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                        this.mappedFileSize,
                        this.mappedFiles.size());
                } else {
                    // mappedFile 文件下标
                    // (offset / this.mappedFileSize) 为这个 offset 所在 mappedFile 文件中的第几个个数,定义为:sum
                    //  (firstMappedFile.getFileFromOffset() / this.mappedFileSize)) 为第一个文件所在的文件个数, 定义为:first
                    // sum - first 为这个 offset,在现有的 mappedFiles 集合文件的下标。
                    int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                    MappedFile targetFile = null;
                    try {
                        targetFile = this.mappedFiles.get(index);
                    } catch (Exception ignored) {
                    }
    
                    if (targetFile != null && offset >= targetFile.getFileFromOffset()
                        && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                        return targetFile;
                    }
    
                    for (MappedFile tmpMappedFile : this.mappedFiles) {
                        if (offset >= tmpMappedFile.getFileFromOffset()
                            && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                            return tmpMappedFile;
                        }
                    }
                }
    
                if (returnFirstOnNotFound) {
                    return firstMappedFile;
                }
            }
        } catch (Exception e) {
            log.error("findMappedFileByOffset Exception", e);
        }
    
        return null;
    }
    

    MappedFile#selectMappedBuffer()方法根据数据所在的pos位置,从ByteBuffer中查询数据。

    /**
     * 获取 mappedBuffer 中的数据
     * @param pos mappedBuffer 中的一个位置,必须小于可读数据的位置
     * @return
     */
    public SelectMappedBufferResult selectMappedBuffer(int pos) {
        int readPosition = getReadPosition();
        if (pos < readPosition && pos >= 0) {
            if (this.hold()) {
                ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
                byteBuffer.position(pos);
                int size = readPosition - pos;
                ByteBuffer byteBufferNew = byteBuffer.slice();
                byteBufferNew.limit(size);
                return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
            }
        }
    
        return null;
    }
    

    相关文章

      网友评论

        本文标题:ConsumeQueue索引文件及构建

        本文链接:https://www.haomeiwen.com/subject/vgwowktx.html