美文网首页rocketMq理论与实践
RocketMq broker过期文件删除

RocketMq broker过期文件删除

作者: 晴天哥_王志 | 来源:发表于2020-06-14 21:26 被阅读0次

    系列

    开篇

    • RocketMQ操作CommitLog、ConsumeQueue文件是基于文件内存映射机制,并且在启动的时候会将所有的文件加载,为了避免内存与磁盘的浪费、能够让磁盘能够循环利用、避免因为磁盘不足导致消息无法写入等引入了文件过期删除机制。

    • 这篇文章的主要目的是分析RocketMqbroker过期文件删除的逻辑。

    • commitLog的文件删除逻辑根据commitLog的MappedFile的最新写入时间和文件的过期时间进行比较,如果超过就会删除该文件。

    • consumeQueue的文件删除逻辑基于commitLog,基于commitLog的最早文件的最小物理偏移和consumeQueue的单个文件的最后偏移的物理偏移进行比较,如果consumeQueue最新的物理偏移小鱼comitLog的最早物理偏移那么该cosumeQueue文件就可以删除。

    CommitLog删除

    public class DefaultMessageStore implements MessageStore {
        private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    
        private final MessageStoreConfig messageStoreConfig;
        // CommitLog
        private final CommitLog commitLog;
    
        private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
    
        private final FlushConsumeQueueService flushConsumeQueueService;
        // 清除commitLog的服务CleanCommitLogService
        private final CleanCommitLogService cleanCommitLogService;
        // 清除consumeQueue的服务CleanConsumeQueueService
        private final CleanConsumeQueueService cleanConsumeQueueService;
    
        private void addScheduleTask() {
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    // 默认是每隔10s时间执行cleanFiles操作
                    DefaultMessageStore.this.cleanFilesPeriodically();
                }
                // getCleanResourceInterval返回的是10s的时间
            }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
    
        }
    
        private void cleanFilesPeriodically() {
            // 执行commitLog的clean操作
            this.cleanCommitLogService.run();
            // 执行consumeQueue的clean操作
            this.cleanConsumeQueueService.run();
        }
    }
    
    • CommitLog通过周期性执行cleanFilesPeriodically方法,默认以10s的频率去执行。
    • cleanFilesPeriodically负责执行cleanCommitLogService和cleanConsumeQueueService。
    • cleanCommitLogService负责清除CommitLog文件
    • cleanConsumeQueueService负责清除ConsumeQueue文件

    CleanCommitLogService

    public class DefaultMessageStore implements MessageStore {
    
        class CleanCommitLogService {
    
            private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
            private final double diskSpaceWarningLevelRatio =
                Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
    
            private final double diskSpaceCleanForciblyRatio =
                Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
            private long lastRedeleteTimestamp = 0;
    
            private volatile int manualDeleteFileSeveralTimes = 0;
            private volatile boolean cleanImmediately = false;
    
            public void excuteDeleteFilesManualy() {
                this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
                DefaultMessageStore.log.info("executeDeleteFilesManually was invoked");
            }
    
            public void run() {
                try {
                    // 1、尝试删除过期文件
                    this.deleteExpiredFiles();
                    // 2、重新尝试删除被线程hanged的文件
                    this.redeleteHangedFile();
                } catch (Throwable e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }
    
            private void deleteExpiredFiles() {
                int deleteCount = 0;
    
                // fileReservedTime:文件过期时间,也就是从文件最后一次的更新时间到现在为止,如果超过该时间,则是过期文件可被删除
                // fileReservedTime默认为72 hour
                long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
    
                // deletePhysicFilesInterval:删除物理文件的时间间隔,在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除,
                // 因此删除一个文件后需要间隔deletePhysicFilesInterval这个时间再删除另外一个文件,
                // 我猜测可能是由于删除文件是一个非常耗费IO的操作,会引起消息插入消费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件
                // deletePhysicFilesInterval默认值为100
                int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
    
                // destroyMapedFileIntervalForcibly:在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,
                // 同时将该文件标记不可用并且纪录当前时间戳destroyMapedFileIntervalForcibly这个表示文件在第一次删除拒绝后,文件保存的最大时间,
                // 在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少1000,直到引用 小于等于 0为止,即可删除该文件
                // destroyMapedFileIntervalForcibly 默认为 1000 * 120;
                int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
    
                // 是否到了时间该删除文件,默认是为凌晨4点
                //  private String deleteWhen = "04";
                boolean timeup = this.isTimeToDelete();
    
                // 是否因为磁盘空间占用该删除文件
                boolean spacefull = this.isSpaceToDelete();
    
                // 是否因为手动删除指令该删除文件
                boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
    
                if (timeup || spacefull || manualDelete) {
    
                    if (manualDelete)
                        this.manualDeleteFileSeveralTimes--;
    
                    // 判断是否需要立即清理
                    boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
    
                    // fileReservedTime切换到ms级别,原为72 hour
                    fileReservedTime *= 60 * 60 * 1000;
    
                    // 执行删除过期文件 deleteExpiredFile
                    deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
                        destroyMapedFileIntervalForcibly, cleanAtOnce);
    
                    if (deleteCount > 0) {
                    } else if (spacefull) {
                        log.warn("disk space will be full soon, but delete file failed.");
                    }
                }
            }
    
            // 按照配置的when进行分割然后进行判断逻辑
            public static boolean isItTimeToDo(final String when) {
                String[] whiles = when.split(";");
                if (whiles.length > 0) {
                    Calendar now = Calendar.getInstance();
                    for (String w : whiles) {
                        int nowHour = Integer.parseInt(w);
                        if (nowHour == now.get(Calendar.HOUR_OF_DAY)) {
                            return true;
                        }
                    }
                }
    
                return false;
            }
    
    
            // 判断时间是否到期进行删除
            private boolean isTimeToDelete() {
                String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
                if (UtilAll.isItTimeToDo(when)) {
                    DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
                    return true;
                }
    
                return false;
            }
    
            // 判断磁盘使用率占比进行删除
            private boolean isSpaceToDelete() {
                // 默认返回75%
                double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
    
                cleanImmediately = false;
    
                {
                    // 1、获取commitLog所在的路径
                    String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
                    // 2、计算commitLog所在分区的磁盘空间占用比率
                    double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
                    // diskSpaceWarningLevelRatio为90%
                    if (physicRatio > diskSpaceWarningLevelRatio) {
                        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
                        if (diskok) {
                            DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
                        }
    
                        cleanImmediately = true;
                    } else if (physicRatio > diskSpaceCleanForciblyRatio) {
                        // diskSpaceCleanForciblyRatio为85%
                        cleanImmediately = true;
                    } else {
                        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
                        if (!diskok) {
                            DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
                        }
                    }
    
                    if (physicRatio < 0 || physicRatio > ratio) {
                        DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
                        return true;
                    }
                }
    
                {
                    // 获取consumeQueue
                    String storePathLogics = StorePathConfigHelper
                        .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
                    // 计算consumeQueue的磁盘占用率
                    double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
                    if (logicsRatio > diskSpaceWarningLevelRatio) {
                        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
                        if (diskok) {
                            DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
                        }
    
                        cleanImmediately = true;
                    } else if (logicsRatio > diskSpaceCleanForciblyRatio) {
                        cleanImmediately = true;
                    } else {
                        boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
                        if (!diskok) {
                            DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
                        }
                    }
    
                    if (logicsRatio < 0 || logicsRatio > ratio) {
                        DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);
                        return true;
                    }
                }
    
                return false;
            }
        }
    }
    
    • CleanCommitLogService会根据定时维度、磁盘占比、人工删除等三方面触发删除。
    • 定时维度是指按照配置的时间节点触发文件删除。
    • 磁盘占比是指磁盘使用率超过85%就会触发文件删除。
    • 由人工触发删除,通过命令方法进行删除。

    CommitLog

    public class CommitLog {
    
        protected final MappedFileQueue mappedFileQueue;
    
        // expiredTime表示文件保留的天数
        // deleteFilesInterval删除文件的间隔时间
        // intervalForcibly线程被hanged的情况下强制时间
        // cleanImmediately为是否立即清理,默认为true
        public int deleteExpiredFile(
            final long expiredTime,
            final int deleteFilesInterval,
            final long intervalForcibly,
            final boolean cleanImmediately
        ) {
            return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
        }
    
    • 执行CommitLog的deleteExpiredFile方法。
    • 进而执行mappedFileQueue的deleteExpiredFileByTime方法。

    MappedFileQueue

    public class MappedFileQueue {
    
        private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
    
        private Object[] copyMappedFiles(final int reservedMappedFiles) {
            Object[] mfs;
    
            if (this.mappedFiles.size() <= reservedMappedFiles) {
                return null;
            }
    
            mfs = this.mappedFiles.toArray();
            return mfs;
        }
    
        public int deleteExpiredFileByTime(final long expiredTime,
            final int deleteFilesInterval,
            final long intervalForcibly,
            final boolean cleanImmediately) {
            // 获取所有的mappedFiles
            Object[] mfs = this.copyMappedFiles(0);
    
            if (null == mfs)
                return 0;
    
            int mfsLength = mfs.length - 1;
            int deleteCount = 0;
            List<MappedFile> files = new ArrayList<MappedFile>();
            if (null != mfs) {
                // 遍历所有的MappedFile文件
                for (int i = 0; i < mfsLength; i++) {
                    MappedFile mappedFile = (MappedFile) mfs[i];
                    // MappedFile的过期时间为日志的最后更新时间+过期时间
                    // 也就是说文件最后更新时间 + 72H的过期时间
                    long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
                    // 如果当前时间大于文件的过期时间 或者 需要立即更新操作
                    if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
                        if (mappedFile.destroy(intervalForcibly)) {
                            files.add(mappedFile);
                            deleteCount++;
    
                            if (files.size() >= DELETE_FILES_BATCH_MAX) {
                                break;
                            }
                            // 每删除一个文件后需要等待deleteFilesInterval时间间隔
                            if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
                                try {
                                    Thread.sleep(deleteFilesInterval);
                                } catch (InterruptedException e) {
                                }
                            }
                        } else {
                            break;
                        }
                    } else {
                        //avoid deleting files in the middle
                        break;
                    }
                }
            }
    
            deleteExpiredFile(files);
    
            return deleteCount;
        }
    
        void deleteExpiredFile(List<MappedFile> files) {
    
            if (!files.isEmpty()) {
    
                Iterator<MappedFile> iterator = files.iterator();
                while (iterator.hasNext()) {
                    MappedFile cur = iterator.next();
                    if (!this.mappedFiles.contains(cur)) {
                        iterator.remove();
                        log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());
                    }
                }
    
                try {
                    if (!this.mappedFiles.removeAll(files)) {
                        log.error("deleteExpiredFile remove failed.");
                    }
                } catch (Exception e) {
                    log.error("deleteExpiredFile has exception.", e);
                }
            }
        }
    }
    
    • MappedFileQueue的deleteExpiredFileByTime负责判断mappedFile的最后更新时间+文件最大存储时间 和 当前时间进行判断,如果当前时间 > mappedFile的最后更新时间+文件最大存储时间,文件就可以进行删除。
    • 文件删除包含磁盘物理文件的删除+逻辑mappedFiles的文件删除。

    MappedFile

    public class MappedFile extends ReferenceResource {
    
        public boolean destroy(final long intervalForcibly) {
            // intervalForcibly这个表示文件在第一次删除拒绝后,文件保存的最大时间
            this.shutdown(intervalForcibly);
    
            if (this.isCleanupOver()) {
                try {
                    this.fileChannel.close();
                    long beginTime = System.currentTimeMillis();
                    // 文件删除
                    boolean result = this.file.delete();
                } catch (Exception e) {
                }
    
                return true;
            } else {
            }
    
            return false;
        }
    }
    
    
    public abstract class ReferenceResource {
        protected final AtomicLong refCount = new AtomicLong(1);
        protected volatile boolean available = true;
        protected volatile boolean cleanupOver = false;
        private volatile long firstShutdownTimestamp = 0;
    
        public void shutdown(final long intervalForcibly) {
            if (this.available) {
                this.available = false;
                this.firstShutdownTimestamp = System.currentTimeMillis();
                this.release();
            } else if (this.getRefCount() > 0) {
                if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
                    this.refCount.set(-1000 - this.getRefCount());
                    this.release();
                }
            }
        }
    
        public void release() {
            // 只有当refCount 小于0的情况才能执行release
            long value = this.refCount.decrementAndGet();
            if (value > 0)
                return;
    
            synchronized (this) {
    
                this.cleanupOver = this.cleanup(value);
            }
        }
    }
    
    • MappedFile的文件删除包括fileChannel的close和file的delete删除。

    CleanConsumeQueueService

    public class DefaultMessageStore implements MessageStore {
    
        class CleanConsumeQueueService {
            private long lastPhysicalMinOffset = 0;
    
            public void run() {
                try {
                    this.deleteExpiredFiles();
                } catch (Throwable e) {
                    DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
                }
            }
    
            private void deleteExpiredFiles() {
                int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
    
                // 获取commitLog的最小的offset
                long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    
                if (minOffset > this.lastPhysicalMinOffset) {
                    this.lastPhysicalMinOffset = minOffset;
    
                    ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
    
                    // 遍历所有的consumeQueueTable逐个进行删除
                    for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
                        for (ConsumeQueue logic : maps.values()) {
                            // 针对单个ConsumeQueue依据minOffset进行删除
                            int deleteCount = logic.deleteExpiredFile(minOffset);
    
                            if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
                                try {
                                    Thread.sleep(deleteLogicsFilesInterval);
                                } catch (InterruptedException ignored) {
                                }
                            }
                        }
                    }
    
                    DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
                }
            }
        }
    }
    
    • CleanConsumeQueueService通过deleteExpiredFiles执行consumeQueue进行删除。
    • deleteExpiredFiles会获取最早的commitLog的物理偏移量lastPhysicalMinOffset,然后根据lastPhysicalMinOffset去遍历所有的consumeQueueTable挨个进行删除。
    • 最终会根据lastPhysicalMinOffset去执行consumeQueue的deleteExpiredFile。
    • consumeQueue底层实际上是mappedFileQueue的文件实现。

    ConsumeQueue

    public class ConsumeQueue {
    
        public int deleteExpiredFile(long offset) {
            int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
            this.correctMinOffset(offset);
            return cnt;
        }
    }
    
    • 执行mappedFileQueue的deleteExpiredFileByOffset。

    MappedFileQueue

    public class MappedFileQueue {
    
        public int deleteExpiredFileByOffset(long offset, int unitSize) {
            Object[] mfs = this.copyMappedFiles(0);
    
            List<MappedFile> files = new ArrayList<MappedFile>();
            int deleteCount = 0;
            if (null != mfs) {
    
                int mfsLength = mfs.length - 1;
                // 遍历所有的consumeQueue的MappedFile
                for (int i = 0; i < mfsLength; i++) {
                    boolean destroy;
                    MappedFile mappedFile = (MappedFile) mfs[i];
    
                    // 获取consumeQueue的MappedFile的最新的20个字节
                    // 比较该consumeQueue的最新的逻辑位移和最小待删除的物理偏移
                    SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
                    if (result != null) {
                        long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
                        result.release();
                        // 如果最大的maxOffsetInLogicQueue还是小于最小偏移量就可以进行删除
                        destroy = maxOffsetInLogicQueue < offset;
    
                    } else if (!mappedFile.isAvailable()) { // Handle hanged file.
                        destroy = true;
                    } else {
                        break;
                    }
                    // 针对需要删除的文件,执行mappedFile.destroy
                    if (destroy && mappedFile.destroy(1000 * 60)) {
                        files.add(mappedFile);
                        deleteCount++;
                    } else {
                        break;
                    }
                }
            }
            // 逻辑删除mappedFile
            deleteExpiredFile(files);
    
            return deleteCount;
        }
    }
    
    • MappedFileQueue的deleteExpiredFileByOffset的核心逻辑在于比较MappedFile的最新的偏移量和待删除的commitLog最小的物理偏移量进行比较。
    • 如果consumeQueue的MappedFileQueue的最大逻辑位移小于commitLog的最小位移,那么就可以删除该consumeQueue文件。

    相关文章

      网友评论

        本文标题:RocketMq broker过期文件删除

        本文链接:https://www.haomeiwen.com/subject/nfkfxktx.html