美文网首页rocketMq理论与实践
RocketMq IndexService介绍

RocketMq IndexService介绍

作者: 晴天哥_王志 | 来源:发表于2020-05-13 13:06 被阅读0次

系列

开篇

  • 这个系列的主要目的是介绍RocketMq broker的原理和用法,在这个系列当中会介绍 broker 配置文件、broker 启动流程、broker延迟消息、broker消息存储。

  • 这篇文章主要介绍broker IndexService,主要介绍IndexService的数据结构和对应的建索引过程。

IndexFile 介绍

  • IndexFile文件的存储位置是:\store\index${fileName},文件名fileName是以创建时的时间戳命名的,文件大小是固定的,等于40+500W4+2000W20= 420000040个字节大小。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

  • 其中的索引数据包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 这四个字段,一共20 Byte。NextIndex offset 即前面读出来的 slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。202000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。

  • IndexFile在解决hash冲突的过程中会采用头插法,即所有的冲突数据都往链表的头部进行插入,然后每个新添加的元素都会包含后一个元素的位置,hash对应的slot Table会指向第一个索引元素。在实际元素存储的数据的顺序和查询的顺序是逆向映射的,这点需要理解

IndexFile创建

public class IndexService {

    private static final int MAX_TRY_IDX_CREATE = 3;
    private final DefaultMessageStore defaultMessageStore;
    private final int hashSlotNum;
    private final int indexNum;
    private final String storePath;
    private final ArrayList<IndexFile> indexFileList = new ArrayList<IndexFile>();
    private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    public IndexService(final DefaultMessageStore store) {
        this.defaultMessageStore = store;
        // private int maxHashSlotNum = 5000000;
        // private int maxIndexNum = 5000000 * 4;
        this.hashSlotNum = store.getMessageStoreConfig().getMaxHashSlotNum();
        this.indexNum = store.getMessageStoreConfig().getMaxIndexNum();
        this.storePath =
            StorePathConfigHelper.getStorePathIndex(store.getMessageStoreConfig().getStorePathRootDir());
    }

    public IndexFile getAndCreateLastIndexFile() {
        IndexFile indexFile = null;
        IndexFile prevIndexFile = null;
        long lastUpdateEndPhyOffset = 0;
        long lastUpdateIndexTimestamp = 0;

        {
            this.readWriteLock.readLock().lock();
            if (!this.indexFileList.isEmpty()) {
                IndexFile tmp = this.indexFileList.get(this.indexFileList.size() - 1);
                if (!tmp.isWriteFull()) {
                    indexFile = tmp;
                } else {
                    lastUpdateEndPhyOffset = tmp.getEndPhyOffset();
                    lastUpdateIndexTimestamp = tmp.getEndTimestamp();
                    prevIndexFile = tmp;
                }
            }

            this.readWriteLock.readLock().unlock();
        }

        if (indexFile == null) {
            try {
                // 创建的文件名以时间戳作为文件名
                String fileName =
                    this.storePath + File.separator
                        + UtilAll.timeMillisToHumanString(System.currentTimeMillis());
                indexFile =
                    // 文件的hashSlotNum=5000000,indexNum=5000000 * 4
                    new IndexFile(fileName, this.hashSlotNum, this.indexNum, lastUpdateEndPhyOffset,
                        lastUpdateIndexTimestamp);
                this.readWriteLock.writeLock().lock();
                this.indexFileList.add(indexFile);
            } catch (Exception e) {
                log.error("getLastIndexFile exception ", e);
            } finally {
                this.readWriteLock.writeLock().unlock();
            }

            if (indexFile != null) {
                // 前置文件刷盘
                final IndexFile flushThisFile = prevIndexFile;
                Thread flushThread = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        IndexService.this.flush(flushThisFile);
                    }
                }, "FlushIndexFileThread");

                flushThread.setDaemon(true);
                flushThread.start();
            }
        }

        return indexFile;
    }
}
  • IndexService在不存在或者当前文件已满的情况下会创建新的indexFile文件呢。
  • indexFile文件的名为当前时间戳、hashSlotNum=5000000,indexNum=5000000 * 4。

Index存储

public class IndexFile {

    private static int hashSlotSize = 4;
    private static int indexSize = 20;
    private static int invalidIndex = 0;
    private final int hashSlotNum;
    private final int indexNum;

    private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
        for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
            log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");

            indexFile = retryGetAndCreateIndexFile();
            if (null == indexFile) {
                return null;
            }

            ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
        }

        return indexFile;
    }


    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
        // 在Index的文件没有满的情况下放置索引数据
        if (this.indexHeader.getIndexCount() < this.indexNum) {

            // 1、针对key计算hash值
            int keyHash = indexKeyHashMethod(key);
            // 2、记录hash值应该保存的slot的位置
            int slotPos = keyHash % this.hashSlotNum;
            // 3、计算Index文件当中slotPos对应的实际物理偏移
            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;

            FileLock fileLock = null;

            try {
                // 获取absSlotPos位置记录当前存储的index的位移
                int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
                    slotValue = invalidIndex;
                }

                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();

                timeDiff = timeDiff / 1000;

                if (this.indexHeader.getBeginTimestamp() <= 0) {
                    timeDiff = 0;
                } else if (timeDiff > Integer.MAX_VALUE) {
                    timeDiff = Integer.MAX_VALUE;
                } else if (timeDiff < 0) {
                    timeDiff = 0;
                }
                // 4、计算index实际的存储的偏移
                // 实际位置等于headIndex + slot占用的位置(slotSize * slotNum) + 已有的Index的偏移(已有index的个数*indexSize)
                int absIndexPos =
                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
                        + this.indexHeader.getIndexCount() * indexSize;
                // 5、生成Index的对象放置 keyHash、phyOffset、timeDiff、slotValue(保存的该hash值下前一个index的逻辑位移,也就是第几个index对象)、

                this.mappedByteBuffer.putInt(absIndexPos, keyHash);
                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
                // 6、记录SlotPos的当前的index的个数,即逻辑位移。
                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());

                if (this.indexHeader.getIndexCount() <= 1) {
                    this.indexHeader.setBeginPhyOffset(phyOffset);
                    this.indexHeader.setBeginTimestamp(storeTimestamp);
                }
                // 设置Index文件整体的index个数和slot个数、commitLog的最后物理偏移和最新的存储时间戳。
                this.indexHeader.incHashSlotCount();
                this.indexHeader.incIndexCount();
                this.indexHeader.setEndPhyOffset(phyOffset);
                this.indexHeader.setEndTimestamp(storeTimestamp);

                return true;
            } catch (Exception e) {
                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
            } finally {
                if (fileLock != null) {
                    try {
                        fileLock.release();
                    } catch (IOException e) {
                        log.error("Failed to release the lock", e);
                    }
                }
            }
        } else {
            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
                + "; index max num = " + this.indexNum);
        }

        return false;
    }
}
  • IndexFile#putKey实现了整个index文件存储过程,由于IndexFile实现的是类似hash的结果,所以存储过程也跟hash的存储流程比较相似。

  • 1、针对key计算hash值,记录hash值应该保存的slot的位置,计算Index文件当中slotPos对应的实际物理偏移。

  • 2、根据slotPos对应的实际物理偏移获取该slot下最新的index文件的逻辑位移,即index linked list的第几个。

  • 3、计算index实际的存储的偏移,实际位置等于headIndex + slot占用的位置(slotSize * slotNum) + 已有的Index的物理偏移(已有index的个数*indexSize)。

  • 4、生成当前Index的对象放置 keyHash、phyOffset(commitLog的实际偏移量)、timeDiff、slotValue(保存的该hash值下前一个index的逻辑位移,也就是第几个index对象),slotValue起到了链表链接的作用。

  • 5、设置Index文件整体的index个数和slot个数、commitLog的最后物理偏移和最新的存储时间戳。

相关文章

  • RocketMq IndexService介绍

    系列 RocketMq broker 配置文件 RocketMq broker 启动流程 RocketMq bro...

  • rocketMq概念介绍

    rocketMq官网 http://rocketmq.apache.org/ rocketMq逻辑概念介绍 备注:...

  • RocketMQ学习

    RocketMQ深度解析RocketMQ之一:RocketMQ整体介绍RocketMQ之二:分布式开放消息系统Ro...

  • Apache Kafka 基础介绍

    Apache Kafka 基础介绍 介绍完RocketMQ,就不得不介绍一下kafka,RocketMQ就是照着k...

  • Apache RocketMQ 的基础特性介绍

    Apache RocketMQ 的基础特性介绍 Apache RocketMQ 系列: Apache Rocket...

  • RocketMQ介绍

    RocketMQ介绍 RocketMQ介绍什么是消息队列产品功能功能概览图多协议支持管理工具特色功能专有云部署消息...

  • RocketMQ介绍

     消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。Apac...

  • RocketMQ 介绍

    RocketMQ 介绍 RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可...

  • CentOS7 安装RocketMQ并测试使用

    RocketMQ 介绍 Apache RocketMQ™是一个开源的分布式消息传递和流数据平台。 RocketMQ...

  • Docker之安装RocketMQ

    Docker安装RocketMQ RocketMQ目录 简单介绍 安装Namesrv 安装broker服务器 安装...

网友评论

    本文标题:RocketMq IndexService介绍

    本文链接:https://www.haomeiwen.com/subject/okvinhtx.html