美文网首页程序猿阵线联盟-汇总各类技术干货程序员
rocketmq定时清理commitlog文件源码分析

rocketmq定时清理commitlog文件源码分析

作者: 运维开发笔记 | 来源:发表于2017-12-13 20:39 被阅读0次

    rocketmq的配置参数

    // 何时触发删除文件, 默认凌晨4点删除文件
    @ImportantField
    private String deleteWhen = "04";
    

    猜想rocketmq会起一个一天执行一次的定时任务。
    但看了代码发现并不是这样。

    在存储服务启动时,启动如下的定时任务:

    private void addScheduleTask() {
        // 定时删除过期文件
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                DefaultMessageStore.this.cleanFilesPeriodically();
            }
        }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
    

    getCleanResourceInterval 默认是10s。

    没10s执行CleanCommitLogService,清理过期文件。

    删除时,有三个判断条件:
    if (timeup || spacefull || manualDelete)
    timeup就是到了定时时间点的判断。
    spacefull是磁盘满的判断。
    manualDelete是手动触发删除的判断。

    判断定时时间的逻辑简单。
    判断磁盘的,需要注意。
    得到磁盘利用率的方法:
    拿到commitlog的文件路径

    long totalSpace = file.getTotalSpace();
    long freeSpace = file.getFreeSpace();
    

    这样就可以得到磁盘空间的信息。

    需要注意的是,有三个磁盘利用率的配置。

    1. diskMaxUsedSpaceRatio。我们设置的磁盘最大利用率。默认是75
    2. DiskSpaceWarningLevelRatio。磁盘空间警戒水位,超过,则停止接收新消息(出于保护自身目的)默认是90
    3. DiskSpaceCleanForciblyRatio。磁盘空间强制删除文件水位。默认是85

    逻辑是:
    当磁盘水位高于DiskSpaceWarningLevelRatio。会触发一个getAndMakeDiskFull操作,标记mq运行状态为磁盘满。并触发一次System.gc()。
    标记cleanImmediately为true。

    当磁盘水位高于DiskSpaceCleanForciblyRatio。标记cleanImmediately为true。

    当上面两个条件不达到,执行getAndMakeDiskOK,将运行状态标记为ok。因为之前可能执行过getAndMakeDiskFull操作,所以这里要再执行下标记OK的操作,这就可以再次写了。也就是说如果之前达到过full状态,至少10s钟不能写,要等下一次执行周期改回ok的状态。

    最后判断下,如果大于diskMaxUsedSpaceRatio。返回true。
    所以我们设置的值不要大于85。

    下面还有一次对storePathRootDir路径下磁盘空间的判断,逻辑一样。

    下面是源码:

            /**
             * 是否可以删除文件,空间是否满足
             */
            private boolean isSpaceToDelete() {
                double ratio =
                        DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
    
                cleanImmediately = false;
    
                // 检测物理文件磁盘空间
                {
                    String storePathPhysic =
                            DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
                    double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
                    if (physicRatio > DiskSpaceWarningLevelRatio) {
                        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
                        if (diskok) {
                            DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio
                                    + ", so mark disk full");
                            System.gc();
                        }
    
                        cleanImmediately = true;
                    }
                    else if (physicRatio > DiskSpaceCleanForciblyRatio) {
                        cleanImmediately = true;
                    }
                    else {
                        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
                        if (!diskok) {
                            DefaultMessageStore.log.info("physic disk space OK " + physicRatio
                                    + ", so mark disk ok");
                        }
                    }
    
                    if (physicRatio < 0 || physicRatio > ratio) {
                        DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, "
                                + physicRatio);
                        return true;
                    }
                }
    
                // 检测逻辑文件磁盘空间
                {
                    String storePathLogics =
                            StorePathConfigHelper.getStorePathConsumeQueue(DefaultMessageStore.this
                                .getMessageStoreConfig().getStorePathRootDir());
                    double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
                    if (logicsRatio > DiskSpaceWarningLevelRatio) {
                        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
                        if (diskok) {
                            DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio
                                    + ", so mark disk full");
                            System.gc();
                        }
    
                        cleanImmediately = true;
                    }
                    else if (logicsRatio > DiskSpaceCleanForciblyRatio) {
                        cleanImmediately = true;
                    }
                    else {
                        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
                        if (!diskok) {
                            DefaultMessageStore.log.info("logics disk space OK " + logicsRatio
                                    + ", so mark disk ok");
                        }
                    }
    
                    if (logicsRatio < 0 || logicsRatio > ratio) {
                        DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, "
                                + logicsRatio);
                        return true;
                    }
                }
    
                return false;
            }
            
    
    1. 继续说CleanCommitLogService
      达到匹配的条件,开始删除。

      // 是否立刻强制删除文件
      boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
      

    isCleanFileForciblyEnable默认是true。所以cleanImmediately为true时,即磁盘>85,将开始强制删除文件。

    // 删除多个CommitLog文件的间隔时间(单位毫秒)
    private int deleteCommitLogFilesInterval = 100;
    // 强制删除文件间隔时间(单位毫秒)
    private int destroyMapedFileIntervalForcibly = 1000 * 120;
    

    执行下面的源码:

    /**
     * 根据文件过期时间来删除物理队列文件
     */
    public int deleteExpiredFileByTime(//
            final long expiredTime, //
            final int deleteFilesInterval, //
            final long intervalForcibly,//
            final boolean cleanImmediately//
    ) {
        Object[] mfs = this.copyMapedFiles(0);
    
        if (null == mfs)
            return 0;
    
        // 最后一个文件处于写状态,不能删除
        int mfsLength = mfs.length - 1;
        int deleteCount = 0;
        List<MapedFile> files = new ArrayList<MapedFile>();
        if (null != mfs) {
            for (int i = 0; i < mfsLength; i++) {
                MapedFile mapedFile = (MapedFile) mfs[i];
                long liveMaxTimestamp = mapedFile.getLastModifiedTimestamp() + expiredTime;
                if (System.currentTimeMillis() >= liveMaxTimestamp//
                        || cleanImmediately) {
                    if (mapedFile.destroy(intervalForcibly)) {
                        files.add(mapedFile);
                        deleteCount++;
    
                        if (files.size() >= DeleteFilesBatchMax) {
                            break;
                        }
    
                        if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                            try {
                                Thread.sleep(deleteFilesInterval);
                            }
                            catch (InterruptedException e) {
                            }
                        }
                    }
                    else {
                        break;
                    }
                }
            }
        }
    
        deleteExpiredFile(files);
    
        return deleteCount;
    }
    

    可以看到是在for循环中用时间判断,或者是强制删除。但是删除的个数有限制,默认最多一次删除10个。
    然后等下次执行周期再删。

    copyMapedFiles时,用的读写锁,加一下读锁。

    相关文章

      网友评论

        本文标题:rocketmq定时清理commitlog文件源码分析

        本文链接:https://www.haomeiwen.com/subject/epgzixtx.html