美文网首页消息中间件
RocketMQ源码-Index索引介绍

RocketMQ源码-Index索引介绍

作者: persisting_ | 来源:发表于2019-07-22 23:19 被阅读3次


    1 概述
    2 入口方法介绍
    3 索引结构介绍
    4 索引操作
    5 索引查询

    1 概述

    RocketMQ中Broker在收到生产者发送的消息时,会将消息存储下来,写入CommitLog,但是此时消息是不可消费也不可查询的。需要等待专门的服务对刚写入的消息进行Reput操作,将消息信息记录到ConsumeQueueIndex中,消息记录到ConsumeQueue完成后该消息就可被消费,消息完成索引到Index中之后就可以根据时间戳和Key进行查询了。

    DefaultMessageStore中用于进行Reput操作的服务实现类为ReputMessageService,该类扩展自ServiceThread,自身就是一个线程,其run方法定义如下:

    //ReputMessageService
    //@Override
    public void run() {
        DefaultMessageStore.log.info(this.getServiceName() + " service started");
    
        while (!this.isStopped()) {
            try {
                //sleep(1)则保证基本上时刻在尝试数据构建,
                //如果速度够快,即消息刚完成写入就进行reput操作
                //则消息还在缓冲中,可避免从硬盘读取数据
                Thread.sleep(1);
                //进行reput操作,完成ConsumeQueue和Index
                //数据的构建
                this.doReput();
            } catch (Exception e) {
                DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    
        DefaultMessageStore.log.info(this.getServiceName() + " service end");
    }
    

    ReputMessageService.doReput方法具体实现不展开介绍,就是将还没有进行ConsumeQueueIndex构建的消息提取出来,进行ConsumeQueueIndex构建。

    构建ConsumeQueue则入口类为CommitLogDispatcherBuildConsumeQueue,而Index的构建入口类为CommitLogDispatcherBuildIndex

    本文我们主要介绍IndexService的实现。

    2 入口方法介绍

    第一节概述已经提到,Index构建的口类为CommitLogDispatcherBuildIndex,其源码如下:

    class CommitLogDispatcherBuildIndex implements CommitLogDispatcher {
    
        @Override
        public void dispatch(DispatchRequest request) {
            //如果启用了索引,则调用indexService.buildIndex
            //进行索引构建
            if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {
                DefaultMessageStore.this.indexService.buildIndex(request);
            }
        }
    }
    

    3 索引结构介绍

    IndexFile是索引结构的具体实现,因为索引也会持久化到硬盘中,所以IndexFile也通过MappedFile进行文件写入操作,关于MappedFile的介绍可以参考笔者文章RocketMQ源码-MappedFile介绍

    RocketMQ中的索引文件分为三个部分,分别为头部、SlotTable和index,如下图所示:

    索引文件结构.jpg

    默认的Slot数量为5000000,默认的index数量为4*5000000个,可配置。

    因为可能消息数目较多,一个索引文件不能保存所有的消息索引信息,所以会使用多个索引文件,索引文件的头部保存了偏移等信息,其结构如下图所示:

    index header.jpg

    各字段含义如下:

    1. beginTimestamp : 该索引文件的第一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 0-7) 8bytes
    2. endTimestamp : 该索引文件的最后一个消息(Message)的存储时间(落盘时间) 物理位置(pos: 8-15) 8bytes
    3. beginPhyoffset : 该索引文件第一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量(可以通过该物理偏移直接获取到该消息) 物理位置(pos: 16-23) 8bytes
    4. beginPhyoffset : 该索引文件最后一个消息(Message)的在CommitLog(消息存储文件)的物理位置偏移量 (pos: 24-31) 8bytes
    5. hashSlotCount : 该索引文件目前的hash slot的个数 (pos: 32-35) 4bytes
    6. indexCount : 该索引文件目前的索引个数 (pos: 36-39) 4bytes

    每个消息在索引文件尾部的占用一个节点,保存key的hash,还保存了该消息在消息文件中的物理位置,插入时间,解决hash冲突用的上一个冲突索引的位置,具体结构如下图所示:

    index结构.jpg
    1. key hash value: message key的hash值
    2. phyOffset: message在CommitLog的物理文件地址, 可以直接查询到该消息(索引的核心机制)
    3. timeDiff: message的落盘时间与header里的beginTimestamp的差值(为了节省存储空间,如果直接存message的落盘时间就得8bytes)
    4. prevIndex: hash冲突处理的关键之处, 相同hash值上一个消息索引的index(如果当前消息索引是该hash值的第一个索引,则prevIndex=0, 也是消息索引查找时的停止条件),每个slot位置的第一个消息的prevIndex就是0的。

    上面截图和部分说明引用自文章rocketMq - index介绍

    根据加入索引的时间依次放置,第一个加入索引的放在索引的第一个位置,第二个则在索引的第二个位置,以此类推。

    每个消息根据key的hash值被映射到slotTable节点上,在对应的slot节点上保存的是其在索引的位置,如果发生冲突,即该slot上的值不为0,则表示已经有其他消息索引占用了该slot,那么使用链表方法处理冲突,该slot更新为最新索引的消息在索引中的位置,先前加入的冲突索引位置则记录在该索引的prevIndex字段中。

    使用的映射方法如下:

    //keyHash为key的hash,hashSlotNum默认500w
    //也就是hash算法为求余法
    int slotPos = keyHash % this.hashSlotNum;
    

    4 索引操作

    在介绍索引操作之前,我们先看下建立索引依赖的消息key字段到底是什么,其实根据源码发现就是消息的MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX属性,该属性可在生产者生产消息时自己指定,如果不指定则会在发送之前调用MessageClientIDSetter.setUniqID(msg);进行初始化,具体如何产生唯一的ID算法这里不做介绍。

    索引操作主要实现在IndexFile.putKey方法中:

    //IndexFile
    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        //如果该indexFile还没有达到最大的索引数目,则
        //可以继续写入,否则发挥失败
        if (this.indexHeader.getIndexCount() < this.indexNum) {
            //计算key的hash值
            int keyHash = indexKeyHashMethod(key);
            //根据hash值计算在slotTable中的位置
            int slotPos = keyHash % this.hashSlotNum;
            //根据在slotTable中的位置、索引头部大小(40b)、
            //每个slot的大小(4b)计算该slot在文件中的物理位置
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    
            FileLock fileLock = null;
    
            try {
    
                // fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
                // false);
                //获取slotTable该slot位置上的值
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                //如果值小于invalidIndex(该值为0)或者值大于
                //当天存在的索引个数,则置为0
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }
                //根据消息的存储时间和该索引文件头部记录的
                //第一个消息存储时间计算时间差
                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
    
                timeDiff = timeDiff / 1000;
    
                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }
                //计算该索引在索引链表中的位置
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
    
                //先存key的hash
                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                //记录该消息的在消息文件中的物理位置
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                //记录落地时间差
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                //记录上一个落在该位置上的冲突索引位置
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
    
                //在slotTable中记录该消息在索引中的位置
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
    
                //如果是第一次索引消息,则记录开始物理偏移和
                //开始时间
                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }
    
                //每次追加一个新的索引,递增slotTable中slot
                //数量、索引数量
                //同时每次新增的索引也就是最后一个索引,记录
                //最后一个索引物理偏移以及最后落地时间
                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);
    
                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }
    
        return false;
    }
    

    5 索引查询

    索引查询方法为IndexFile.selectPhyOffset:

    //IndexFile
    public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
        final long begin, final long end, boolean lock) {
        if (this.mappedFile.hold()) {
            //根据key计算hash
            int keyHash = indexKeyHashMethod(key);
            //根据hash值计算在slotTable中的位置
            int slotPos = keyHash % this.hashSlotNum;
            //根据在slotTable中的位置、索引头部大小(40b)、
            //每个slot的大小(4b)计算该slot在文件中的物理位置
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
    
            FileLock fileLock = null;
            try {
                if (lock) {
                    // fileLock = this.fileChannel.lock(absSlotPos,
                    // hashSlotSize, true);
                }
    
                //获取该slot位置上的值
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                // if (fileLock != null) {
                // fileLock.release();
                // fileLock = null;
                // }
                //如果slot上的值为无效值或者该值大于最大索引
                //数量,则表示没有符合条件的索引数据,不作任何
                //操作
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
                    || this.indexHeader.getIndexCount() <= 1) {
                } else {
                    //下面的实现也比较简单,因为使用链地址法
                    //解决hash冲突,所以这里读取链表上的每个
                    //数据,如果时间满足要求并且key的hash
                    //一致,则加入到返回列表中
                    for (int nextIndexToRead = slotValue; ; ) {
                        if (phyOffsets.size() >= maxNum) {
                            break;
                        }
    
                        int absIndexPos =
                            IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                                + nextIndexToRead * indexSize;
    
                        int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
                        long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
    
                        long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
                        int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
    
                        if (timeDiff < 0) {
                            break;
                        }
    
                        timeDiff *= 1000L;
    
                        long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
                        //消息落地时间符合要求
                        boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
    
                        //key的hash一致且消息落地时间符合要求
                        if (keyHash == keyHashRead && timeMatched) {
                            //加入到返回列表中
                            phyOffsets.add(phyOffsetRead);
                        }
    
                        if (prevIndexRead <= invalidIndex
                            || prevIndexRead > this.indexHeader.getIndexCount()
                            || prevIndexRead == nextIndexToRead || timeRead < begin) {
                            break;
                        }
                        //读取链表中的下一个冲突索引
                        nextIndexToRead = prevIndexRead;
                    }
                }
            } catch (Exception e) {
                log.error("selectPhyOffset exception ", e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
    
                this.mappedFile.release();
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:RocketMQ源码-Index索引介绍

        本文链接:https://www.haomeiwen.com/subject/gvpglctx.html