Index索引文件及构建

作者: 93张先生 | 来源:发表于2020-12-06 20:38 被阅读0次

    Index索引文件概览

    消息消费队列是RocetMQ专门为消息订阅构建的索引服务,提高主题与消息队列检索消息的速度。IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:$HOME/store/index/{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

    Index索引文件结构:


    image.png

    IndexFile异步构造

    构建consumequeue、indexFile索引文件,通过一个ReputMessageService异步线程进行处理,构建consumequeue、indexFile索引文件的数据从commitLog的MappedFile中的ByteBuffer中获取,一条消息消息构造一个构建索引服务的DispatchRequest请求,再由ConsumeQueue服务处理DispatchRequest请求构建consumequeue的mappedFile文件。由IndexService处理请求构建indexFile索引文件,然后将各自的文件进行刷盘。

    IndexFile

    IndexFile对象的主要成员属性,其中包含MappedFile对象,这个和ConsumeQueue中利用的MappedFile是一样的作用,用来做磁盘IO,将内存映射消息写入磁盘。

    public class IndexFile {
        // hash槽的大小
        private static int hashSlotSize = 4;
        // 索引大小
        private static int indexSize = 20;
        private static int invalidIndex = 0;
        // 索引槽的数量 500万
        private final int hashSlotNum;
        // 索引的数量 2千万
        private final int indexNum;
        // 索引的MappedFile
        private final MappedFile mappedFile;
        private final FileChannel fileChannel;
        //MappedFile中的直接内存,用来存放hash索引用
        private final MappedByteBuffer mappedByteBuffer;
        // 索引头
        private final IndexHeader indexHeader;
    } 
    

    IndexHeader

    每一个IndexFile都包含IndexHeader,是这个IndexFile的汇总信息。

    // 存放IndexHeader的buffer
    private final ByteBuffer byteBuffer;
    // indexFile存放消息的存储时间的 开始时间
    private AtomicLong beginTimestamp = new AtomicLong(0);
    // indexFile存放消息的存储时间的 结束时间
    private AtomicLong endTimestamp = new AtomicLong(0);
    // indexFile 开始commitLog 文件的offset
    private AtomicLong beginPhyOffset = new AtomicLong(0);
    // IndexFile 结束commitLog 文件的offset
    private AtomicLong endPhyOffset = new AtomicLong(0);
    // hash槽的Count 500万
    private AtomicInteger hashSlotCount = new AtomicInteger(0);
    // hash索引的条目2千万条 500 * 40
    private AtomicInteger indexCount = new AtomicInteger(1);
    

    IndexService

    IndexService 是构建Index索引文件的服务。一条消息可能有多个key,这样这条消息就会有多个索引条目。

    根据一条消息的请求构建Index索引,

    /**
     * 根据一条消息的请求构建Index索引,并放入
     * @param req
     */
    public void buildIndex(DispatchRequest req) {
        //获取需要写入的IndexFile
        IndexFile indexFile = retryGetAndCreateIndexFile();
        if (indexFile != null) {
            // 文件结尾offset
            long endPhyOffset = indexFile.getEndPhyOffset();
            DispatchRequest msg = req;
            String topic = msg.getTopic();
            String keys = msg.getKeys();
            if (msg.getCommitLogOffset() < endPhyOffset) {
                return;
            }
            // 事务消息
            final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    return;
            }
            // 根据uniqKey来构建文件
            if (req.getUniqKey() != null) {
                indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
                if (indexFile == null) {
                    log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                    return;
                }
            }
            // 根据keys中的每个key来构建indexFile
            if (keys != null && keys.length() > 0) {
                String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
                for (int i = 0; i < keyset.length; i++) {
                    String key = keyset[i];
                    if (key.length() > 0) {
                        indexFile = putKey(indexFile, msg, buildKey(topic, key));
                        if (indexFile == null) {
                            log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
                            return;
                        }
                    }
                }
            }
        } else {
            log.error("build index error, stop building index");
        }
    }
    
    /**
     * 放置key到indexFile中
     * @param indexFile
     * @param msg 消息请求
     * @param idxKey topic#uniqKey
     * @return
     */
    private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
        // for循环一直方法详细,直到出现一个错误
        for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
            log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
            //再次获取或创建IndexFile
            indexFile = retryGetAndCreateIndexFile();
            if (null == indexFile) {
                return null;
            }
            //存放消息
            ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
        }
    
        return indexFile;
    }
    

    IndexFile存放这个消息的索引方法,计算key的hash值,确定hash槽,将索引信息存入MappedByteBuffer等待刷盘操作。

    /**
     * 存放索引信息到mappedByteBuffer等待刷盘
     * @param key 存放的key
     * @param phyOffset 存放的物理offset
     * @param storeTimestamp 存放消息存储的时间
     * @return
     */
    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            // key的hash值
            int keyHash = indexKeyHashMethod(key);
            // 所在hash槽的位置
            int slotPos = keyHash % this.hashSlotNum;
            // 40byte hashIndex + 槽的位置 * 槽的大小
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    
            FileLock fileLock = null;
    
            try {
    
                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
                // 获取mappedByteBuffer写入的位置,上一个hash索引存储的位置
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }
                // 时间差值,根据时间进行消息查找
                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
                // 到秒
                timeDiff = timeDiff / 1000;
    
                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }
                // 存储的绝对位置
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
                // 存放hashKey,absIndexPos这个位置放入keyHash
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                // 存放物理commitlog消息的物理offset,absIndexPos + 4(上一个keyhash占用了4个位置)这个位置放入phyOffset
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                // 存放时间差值
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                // 存放上一个索引的位置
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                // 更新存放的索引数量,absSlotPos为索引槽的位置
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
                //更新文件索引的头信息,hash槽的总数、index条目的总数、最后消息的物理偏移量、最后消息的存储时间
                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }
    
                if (invalidIndex == slotValue) {
                    this.indexHeader.incHashSlotCount();
                }
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);
    
                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }
    
        return false;
    }
    

    消息查询

    QueryMessageProcessor接受客户端查询请求,进行处理。然后到DefaultMessageStore#queryMessage()方法,然后到IndexService#queryOffset(),最后到IndexFile#selectPhyOffset()方法。

    // 根据 topic、key 查找到 indexFile 索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。
    final QueryMessageResult queryMessageResult =
        this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
            requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),
            requestHeader.getEndTimestamp());
    
    // 根据 key 查询消息
    @Override
    public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
        QueryMessageResult queryMessageResult = new QueryMessageResult();
    
        long lastQueryMsgTime = end;
    
        for (int i = 0; i < 3; i++) {
            QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
            if (queryOffsetResult.getPhyOffsets().isEmpty()) {
                break;
            }
    
            Collections.sort(queryOffsetResult.getPhyOffsets());
    
            queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
            queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());
    
            for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
                long offset = queryOffsetResult.getPhyOffsets().get(m);
    
                try {
    
                    boolean match = true;
                    MessageExt msg = this.lookMessageByOffset(offset);
                    if (0 == m) {
                        lastQueryMsgTime = msg.getStoreTimestamp();
                    }
    
    //                    String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR);
    //                    if (topic.equals(msg.getTopic())) {
    //                        for (String k : keyArray) {
    //                            if (k.equals(key)) {
    //                                match = true;
    //                                break;
    //                            }
    //                        }
    //                    }
    
                    if (match) {
                        SelectMappedBufferResult result = this.commitLog.getData(offset, false);
                        if (result != null) {
                            int size = result.getByteBuffer().getInt(0);
                            result.getByteBuffer().limit(size);
                            result.setSize(size);
                            queryMessageResult.addMessage(result);
                        }
                    } else {
                        log.warn("queryMessage hash duplicate, {} {}", topic, key);
                    }
                } catch (Exception e) {
                    log.error("queryMessage exception", e);
                }
            }
    
            if (queryMessageResult.getBufferTotalSize() > 0) {
                break;
            }
    
            if (lastQueryMsgTime < begin) {
                break;
            }
        }
    
        return queryMessageResult;
    }
    
    /**
     * 根据Index消息查询服务
     * @param topic
     * @param key
     * @param maxNum
     * @param begin 消息存储的开始时间,和IndexFile 的IndexHeader中的存储消息的开始时间进行对比
     * @param end 消息存储的结束时间,和IndexFile 的IndexHeader中的存储消息的结束时间进行对比
     * @return
     */
    public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
        List<Long> phyOffsets = new ArrayList<Long>(maxNum);
    
        long indexLastUpdateTimestamp = 0;
        long indexLastUpdatePhyoffset = 0;
        maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
        try {
            this.readWriteLock.readLock().lock();
            if (!this.indexFileList.isEmpty()) {
                for (int i = this.indexFileList.size(); i > 0; i--) {
                    IndexFile f = this.indexFileList.get(i - 1);
                    boolean lastFile = i == this.indexFileList.size();
                    if (lastFile) {
                        indexLastUpdateTimestamp = f.getEndTimestamp();
                        indexLastUpdatePhyoffset = f.getEndPhyOffset();
                    }
    
                    if (f.isTimeMatched(begin, end)) {
    
                        f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
                    }
    
                    if (f.getBeginTimestamp() < begin) {
                        break;
                    }
    
                    if (phyOffsets.size() >= maxNum) {
                        break;
                    }
                }
            }
        } catch (Exception e) {
            log.error("queryMsg exception", e);
        } finally {
            this.readWriteLock.readLock().unlock();
        }
    
        return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
    }
    
    /**
     * 根据索引key查找消息
     * @param phyOffsets 查找到的消息物理偏移量
     * @param key 索引key
     * @param maxNum 本次查找最大消息条数
     * @param begin 开始时间戳
     * @param end 结束时间戳
     * @param lock
     */
    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
        final long begin, final long end, boolean lock) {
        if (this.mappedFile.hold()) {
            // 计算key的hashcode
            int keyHash = indexKeyHashMethod(key);
            // 定位到hash槽的位置
            int slotPos = keyHash % this.hashSlotNum;
            // 计算hash槽的绝对物理位置
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    
            FileLock fileLock = null;
            try {
                if (lock) {
                    // fileLock = this.fileChannel.lock(absSlotPos,
                    // hashSlotSize, true);
                }
                // 得到槽的值
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                // if (fileLock != null) {
                // fileLock.release();
                // fileLock = null;
                // }
                // 如果对应的Hash槽中存储的数据小于1或者大于当前索引条目个数则表示给HashCode没有对应的条条目,直接返回
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    // 由于会存在hash冲突,根据slotValue定位该hash槽最新的一个Item条目,将存储的物理偏移加入到phyOffsets中,
                    // 然后继续验证Item条目中存储的上一个Index下标,如果大于等于1并且小于最大条目数,则继续查找,否则结束查找。
                    for (int nextIndexToRead = slotValue; ; ) {
                        // 如果大于查找数量,中断
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }
                        // 根据Index下标定位到条目的起始物理偏移量,然后依次读取hashCode、物理偏移量、时间差、上一个条目的Index下标
                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;
    
                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
    
                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
                        // 如果存储的时间差小于0,则直接结束;
                        // 如果hashcode匹配并且消息存储时间介于待查找时间start、end之间则将消息物理偏移量加入到phyOffsets,并验证条目
                        // 前一个Index索引,如果大于等于1并且小于Index条目数,则继续查找,否则结束整个查找。
                        if (timeDiff < 0) {
                            break;
                        }
    
                        timeDiff *= 1000L;
    
                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
    
                        if (keyHash == keyHashRead && timeMatched) {
                            phyOffsets.add(phyOffsetRead);
                        }
    
                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }
    
                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
    
                this.mappedFile.release();
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:Index索引文件及构建

        本文链接:https://www.haomeiwen.com/subject/rsaowktx.html