美文网首页
RocketMQ源码解析——存储部分(2)对`MappedFil

RocketMQ源码解析——存储部分(2)对`MappedFil

作者: szhlcy | 来源:发表于2021-05-12 10:23 被阅读0次

MappedFileQueue

 前面已经介绍了RocketMQ跟存储交互的底层封装对象mappedFile。而跟CommitLog,ConsumeQueue进行交互的并不是mappedFile,而是对其进一步封装的MappedFileQueue类。

在这里插入图片描述

属性介绍

    //文件的存储路径
    private final String storePath;

    //映射文件大小,指的是单个文件的大小,比如CommitLog大小为1G
    private final int mappedFileSize;

    //并发线程安全队列存储映射文件
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

    private final AllocateMappedFileService allocateMappedFileService;

    //刷新完的位置
    private long flushedWhere = 0;

    //提交完成的位置
    private long committedWhere = 0;

    //存储时间
    private volatile long storeTimestamp = 0;

MappedFileQueue这个类的属性相对来说比较少,其中需要说的是,AllocateMappedFileService类型的字段,这个对象的作用是根据情况来决定是否需要提前创建好MappedFile对象供后续的直接使用。而这个参数是在构造MappedFileQueue对象的时候的一个参数。只有在CommitLog中构造时才会传入AllocateMappedFileService,在ConsumeQueue并没有传入。

方法介绍

构造方法

MappedFileQueue只有一个全参构造器,分别是传入文件的存储路径storePath,单个存储文件的大小mappedFileSize和提前创建MappedFile对象的allocateMappedFileService

public MappedFileQueue(final String storePath, int mappedFileSize,
        AllocateMappedFileService allocateMappedFileService) {
        //指定文件的存储路径
        this.storePath = storePath;
        //指定单个文件的大小
        this.mappedFileSize = mappedFileSize;
        this.allocateMappedFileService = allocateMappedFileService;
    }

检查文件是否完整checkSelf

   /**
     * 检查文件的是否完整,检查的方式。上一个文件的起始偏移量减去当前文件的起始偏移量,如果差值=mappedFileSize那么说明文件是完整的,否则有损坏
     */
    public void checkSelf() {
        //检查文件组是否为空
        if (!this.mappedFiles.isEmpty()) {
            //对文件进行迭代,一个一个进行检查
            Iterator<MappedFile> iterator = mappedFiles.iterator();
            MappedFile pre = null;
            while (iterator.hasNext()) {
                MappedFile cur = iterator.next();

                if (pre != null) {
                    //用当前文件的其实偏移量-上一个文件的其实偏移量 正常情况下应该等于一个文件的大小。如果不相等,说明文件存在问题
                    if (cur.getFileFromOffset() - pre.getFileFromOffset() != this.mappedFileSize) {
                        LOG_ERROR.error("[BUG]The mappedFile queue's data is damaged, the adjacent mappedFile's offset don't match. pre file {}, cur file {}",
                            pre.getFileName(), cur.getFileName());
                    }
                }
                pre = cur;
            }
        }
    }

 这里检查文件是否被破坏的原理,就是检查文件的大小是不是等于前一个文件的起始偏移量和后一个文件的起始偏移量是不是等于文件大小。而这里的起始偏移量又是在MappedFile进行获取的fileFromOffset,而这个值就是我们在构造MappedFile的时候传入的文件名转化得到的

private void init(final String fileName, final int fileSize) throws IOException {
        //根据文件的名称计算文件其实的偏移量
        this.fileFromOffset = Long.parseLong(this.file.getName());
}

加载文件load

   public boolean load() {
        /**
         *System.getProperty("user.home") + File.separator + "store" + File.separator + 文件名
         * 根据传入的文件保存路径storePath 来获取文件
         */
        File dir = new File(this.storePath);
        File[] files = dir.listFiles();
        //文件列表不为空则进行加载
        if (files != null) {
            // ascending order
            //对文件进行排序
            Arrays.sort(files);
            for (File file : files) {

                //队列映射文件的大小不等于设置的文件类型的大小,说明加载到了最后的一个文件  比如 如果是commitLog那么对于的大小应该为1G
                if (file.length() != this.mappedFileSize) {
                    log.warn(file + "\t" + file.length()
                        + " length not matched message store config value, ignore it");
                    return true;
                }
                try {
                    //根据文件的路径和文件大小,创建对应的文件映射,然后加入到映射列表中
                    MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
                    mappedFile.setWrotePosition(this.mappedFileSize);
                    mappedFile.setFlushedPosition(this.mappedFileSize);
                    mappedFile.setCommittedPosition(this.mappedFileSize);
                    this.mappedFiles.add(mappedFile);
                    log.info("load " + file.getPath() + " OK");
                } catch (IOException e) {
                    log.error("load file " + file + " error", e);
                    return false;
                }
            }
        }
        return true;
    }

 这里的逻辑比较简单,就是根据传入的文件路径,加载对应的文件夹下面的文件,并创建文件映射,并加入到文件映射列表中去。这个方法在RocketMQ启动的时候回调用,用来加载系统中已经存在的消息日志文件。

根据时间戳获取文件getMappedFileByTime

    public MappedFile getMappedFileByTime(final long timestamp) {
        //获取所有的文件映射对象MappedFile
        Object[] mfs = this.copyMappedFiles(0);
        //为null说明 mappedFiles 中没有MappedFile
        if (null == mfs)
            return null;

        for (int i = 0; i < mfs.length; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            //如果文件的最后修改时间大于等于参数时间,说文件在当前传入的时间之后进行修改了,就是需要寻找的文件
            if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
                return mappedFile;
            }
        }
        //如果没有找到合适的MappedFile 就用最后一个
        return (MappedFile) mfs[mfs.length - 1];
    }

 这个方法主要使用的位置在ConsumeQueue中,在通过时间戳来找文件中的消息的偏移量。

根据偏移量获取文件findMappedFileByOffset

    public MappedFile findMappedFileByOffset(final long offset) {
        //根据偏移量来找映射文件,如果没有找到文件的情况下不返回映射文件列表第一个映射文件
        return findMappedFileByOffset(offset, false);
    }
    
    
   public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
        try {
            //获取队列中第一个映射文件
            MappedFile firstMappedFile = this.getFirstMappedFile();
            //获取队列中最后一个映射文件
            MappedFile lastMappedFile = this.getLastMappedFile();
            //如果不存在文件则直接返回null
            if (firstMappedFile != null && lastMappedFile != null) {
                //如果要查找的偏移量offset不在所有的文件偏移量范围内,则打印错误日志
                if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                    LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                        offset,
                        firstMappedFile.getFileFromOffset(),
                        lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                        this.mappedFileSize,
                        this.mappedFiles.size());
                } else {
                   //(指定Offset-第一个文件的其实偏移量)/文件大小=第几个文件夹
                    int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                    MappedFile targetFile = null;
                    try {
                        //获取指定的映射文件
                        targetFile = this.mappedFiles.get(index);
                    } catch (Exception ignored) {
                    }

                    //offset在指定的映射文件中,则直接返回对应的映射文件
                    if (targetFile != null && offset >= targetFile.getFileFromOffset()
                        && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                        return targetFile;
                    }

                    //如果按索引在队列中找不到映射文件就遍历队列查找映射文件
                    for (MappedFile tmpMappedFile : this.mappedFiles) {
                        if (offset >= tmpMappedFile.getFileFromOffset()
                            && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                            return tmpMappedFile;
                        }
                    }
                }

                //如果指定了没有找到文件就返回第一个映射文件,则直接返回第一个映射文件
                if (returnFirstOnNotFound) {
                    return firstMappedFile;
                }
            }
        } catch (Exception e) {
            log.error("findMappedFileByOffset Exception", e);
        }

        return null;
    }

 如上代码分析中有两个根据偏移量获取映射文件的方法,其中有两个参数的方法是在知道偏移量所指的信息在第一个映射文件中的时候才调用,而调用的这个方法的基本就是写入信息或者刷新信息的时候调用。关于文件的刷新和提交可以看上一篇对MappedFile分析的文章

根据偏移量截断文件truncateDirtyFiles

    public void truncateDirtyFiles(long offset) {
        List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();
        /**
         * 如果   文件的起始偏移量>指定截断偏移量offset  那么整个文件需要删除
         * 如果   文件的起始偏移量<指定截断偏移量offset<文件的最大偏移量  那么文件中的部分记录需要清除
         * 如果   文件的最大偏移量<指定截断偏移量offset  那么这个文件不需要进行处理
         */
        for (MappedFile file : this.mappedFiles) {
           //文件的开始偏移量+文件大小= 文件尾offset
            long fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;
            //当前文件的最大偏移量大于 指定截断位置的偏移量,说明需要截断的位置就是在这个文件中
            if (fileTailOffset > offset) {
                //如果文件初始偏移量小于指定的偏移量,说明只需要截断文件中的一部分
                if (offset >= file.getFileFromOffset()) {
                    //设置映射文件写的位置
                    file.setWrotePosition((int) (offset % this.mappedFileSize));
                    //设置文件commit的位置
                    file.setCommittedPosition((int) (offset % this.mappedFileSize));
                    //设置文件刷新的位置
                    file.setFlushedPosition((int) (offset % this.mappedFileSize));
                } else {
                    //如果文件的起始偏移量也比指定的偏移量大,则说明这个文件整个需要丢弃
                    file.destroy(1000);
                    //需要删除的文件加上这个文件
                    willRemoveFiles.add(file);
                }
            }
        }
        //删除映射的文件
        this.deleteExpiredFile(willRemoveFiles);
    }

 截断文件的方法,跟load方法一样,在RocketMQ启动的时候会使用到,用来删除那些无效的或者损坏的需要删除的消息。

获取最后一个文件getLastMappedFile

    public MappedFile getLastMappedFile() {
        MappedFile mappedFileLast = null;
        //如果文件队列不为空则获取最后一个文件
        while (!this.mappedFiles.isEmpty()) {
            try {
                //直接获取最后一个映射文件
                mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
                break;
            } catch (IndexOutOfBoundsException e) {
                //continue;
            } catch (Exception e) {
                log.error("getLastMappedFile has exception.", e);
                break;
            }
        }

        return mappedFileLast;
    }

 这个方法的作用基本就是获取最后一个映射文件,然后进行消息的插入,或者获取最大消息偏移量等信息。

根据时间删除过期文件deleteExpiredFileByTime

public int deleteExpiredFileByTime(final long expiredTime,
        final int deleteFilesInterval,
        final long intervalForcibly,
        final boolean cleanImmediately) {
        //获取映射文件列表
        Object[] mfs = this.copyMappedFiles(0);
        //如果映射文件列表为空直接返回
        if (null == mfs){
            return 0;
        }
        int mfsLength = mfs.length - 1;
        int deleteCount = 0;
        List<MappedFile> files = new ArrayList<MappedFile>();
        if (null != mfs) {
            //对映射文件进行遍历
            for (int i = 0; i < mfsLength; i++) {
                MappedFile mappedFile = (MappedFile) mfs[i];
                //文件最后的修改时间+过期时间= 文件最终能够存活的时间
                long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
                //如果当前时间大于文件能够存活的最大时间,比如 当前是2021-03-18 12:00:00 ,而文件最大存活时间2021-03-18 11:00:00 就需要删除。或者调用方法的时候指定了马上删除
                if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                    //删除文件,就是解除对文件的引用
                    if (mappedFile.destroy(intervalForcibly)) {
                        //要删除的的文件加入到要删除的集合中
                        files.add(mappedFile);
                        //增加计数
                        deleteCount++;
                        //一次性最多删除的人为10
                        if (files.size() >= DELETE_FILES_BATCH_MAX) {
                            break;
                        }
                        //如果删除时间间隔大于0,并且没有循环玩,则睡眠指定的删除间隔时长后在杀出
                        if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                            try {
                                Thread.sleep(deleteFilesInterval);
                            } catch (InterruptedException e) {
                            }
                        }
                    } else {
                        break;
                    }
                } else {
                    //avoid deleting files in the middle
                    break;
                }
            }
        }

        //从文件映射队列中删除对应的文件映射
        deleteExpiredFile(files);
        //返回删除的文件个数
        return deleteCount;
    }

 这个方法被用在定期删除过去的CommitLog文件,来保证内存空间。

根据偏移量删除文件deleteExpiredFileByOffset

   public int deleteExpiredFileByOffset(long offset, int unitSize) {
        Object[] mfs = this.copyMappedFiles(0);

        List<MappedFile> files = new ArrayList<MappedFile>();
        int deleteCount = 0;
        if (null != mfs) {

            int mfsLength = mfs.length - 1;

            for (int i = 0; i < mfsLength; i++) {
                boolean destroy;
                MappedFile mappedFile = (MappedFile) mfs[i];
                //unitSize是一个文件格式占用的长度 比如ConsumeQueue中一条记录长度为20byte  这里是获取一个文件中最后一条记录的起始偏移量,
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
                
                if (result != null) {
                    //获取文件中最后一条记录的偏移量
                    long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
                    result.release();
                    //如果最大偏移量 < 指定的偏移量,则需要删除
                    destroy = maxOffsetInLogicQueue < offset;
                    if (destroy) {
                        log.info("physic min offset " + offset + ", logics in current mappedFile max offset "
                            + maxOffsetInLogicQueue + ", delete it");
                    }
                } else if (!mappedFile.isAvailable()) { // Handle hanged file.
                    log.warn("Found a hanged consume queue file, attempting to delete it.");
                    destroy = true;
                } else {
                    log.warn("this being not executed forever.");
                    break;
                }
                //删除文件
                if (destroy && mappedFile.destroy(1000 * 60)) {
                    files.add(mappedFile);
                    deleteCount++;
                } else {
                    break;
                }
            }
        }
      //  删除映射文件队列中的映射文件=》
        deleteExpiredFile(files);

        return deleteCount;
    }

 按照偏移量删除文件用于删除过期的ConsumeQueue文件,因为ConsumeQueue文件中信息的记录是定长的20byte,如果偏移量小于指定的偏移量表示都是之前的消息,可以直接删除。

其他跟MappedFile有关联的方法

MappedFile MappedFileQueue
flush flush
commit commit
destroy destroy
getFileFromOffset获取文件的初始偏移量 getMinOffset获取文件的最小偏移量,就是获取映射文件队列的第一个文件,然后调用getFileFromOffset
getFileFromOffset+getReadPosition getMaxOffset获取文件最大偏移量,就是获取最后一个映射文件的起始偏移量+文件的写入的位置
getFileFromOffset+getWrotePosition getMaxWrotePosition获取文件最大偏移量,就是获取最后一个映射文件的起始偏移量+文件的写入的位置
remainHowManyDataToCommit获取文件尚未提交的长度
remainHowManyDataToFlush获取文件尚未刷新的长度

CommitLogConsumeQueue中的使用

方法 CommitLog ConsumeQueue
checkSelf 定时检查文件是否完整 定时检查文件是否完整
load MQ启动时加载CommitLog MQ启动时加载ConsumeQueue
getMappedFileByTime 根据时间戳查找特定的topic和queue中的消息
findMappedFileByOffset 根据index获取消息
truncateDirtyFiles MQ启动时截断无用日志 MQ启动时截断无用日志
getLastMappedFile 保存消息时获取文件 保存消息时获取文件
deleteExpiredFileByTime 定时删除过期的文件
deleteExpiredFileByOffset 定时删除过期的文件

 可以看到,MappedFileQueue类中的方法基本是操作MappedFile组成的集合,间接的操作MappedFile达到对日志文件组的增删改的操作,都是一些提供给CommitLogConsumeQueue用来对日志文件进行查找,删除的基础方法。

下一篇存储部分(3)CommitLog文件存储加载刷新的CommitLog

相关文章

网友评论

      本文标题:RocketMQ源码解析——存储部分(2)对`MappedFil

      本文链接:https://www.haomeiwen.com/subject/ectndltx.html