系列
- RocketMq broker 配置文件
- RocketMq broker 启动流程
- RocketMq broker CommitLog介绍
- RocketMq broker consumeQueue介绍
- RocketMq broker 重试和死信队列
- RocketMq broker 延迟消息
- RocketMq IndexService介绍
- RocketMq 读写分离机制
- RocketMq Client管理
- RocketMq broker过期文件删除
开篇
-
RocketMQ操作CommitLog、ConsumeQueue文件是基于文件内存映射机制,并且在启动的时候会将所有的文件加载,为了避免内存与磁盘的浪费、能够让磁盘能够循环利用、避免因为磁盘不足导致消息无法写入等引入了文件过期删除机制。
-
这篇文章的主要目的是分析RocketMqbroker过期文件删除的逻辑。
-
commitLog的文件删除逻辑根据commitLog的MappedFile的最新写入时间和文件的过期时间进行比较,如果超过就会删除该文件。
-
consumeQueue的文件删除逻辑基于commitLog,基于commitLog的最早文件的最小物理偏移和consumeQueue的单个文件的最后偏移的物理偏移进行比较,如果consumeQueue最新的物理偏移小鱼comitLog的最早物理偏移那么该cosumeQueue文件就可以删除。
CommitLog删除
public class DefaultMessageStore implements MessageStore {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private final MessageStoreConfig messageStoreConfig;
// CommitLog
private final CommitLog commitLog;
private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;
private final FlushConsumeQueueService flushConsumeQueueService;
// 清除commitLog的服务CleanCommitLogService
private final CleanCommitLogService cleanCommitLogService;
// 清除consumeQueue的服务CleanConsumeQueueService
private final CleanConsumeQueueService cleanConsumeQueueService;
private void addScheduleTask() {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// 默认是每隔10s时间执行cleanFiles操作
DefaultMessageStore.this.cleanFilesPeriodically();
}
// getCleanResourceInterval返回的是10s的时间
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);
}
private void cleanFilesPeriodically() {
// 执行commitLog的clean操作
this.cleanCommitLogService.run();
// 执行consumeQueue的clean操作
this.cleanConsumeQueueService.run();
}
}
- CommitLog通过周期性执行cleanFilesPeriodically方法,默认以10s的频率去执行。
- cleanFilesPeriodically负责执行cleanCommitLogService和cleanConsumeQueueService。
- cleanCommitLogService负责清除CommitLog文件。
- cleanConsumeQueueService负责清除ConsumeQueue文件。
CleanCommitLogService
public class DefaultMessageStore implements MessageStore {
class CleanCommitLogService {
private final static int MAX_MANUAL_DELETE_FILE_TIMES = 20;
private final double diskSpaceWarningLevelRatio =
Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceWarningLevelRatio", "0.90"));
private final double diskSpaceCleanForciblyRatio =
Double.parseDouble(System.getProperty("rocketmq.broker.diskSpaceCleanForciblyRatio", "0.85"));
private long lastRedeleteTimestamp = 0;
private volatile int manualDeleteFileSeveralTimes = 0;
private volatile boolean cleanImmediately = false;
public void excuteDeleteFilesManualy() {
this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
DefaultMessageStore.log.info("executeDeleteFilesManually was invoked");
}
public void run() {
try {
// 1、尝试删除过期文件
this.deleteExpiredFiles();
// 2、重新尝试删除被线程hanged的文件
this.redeleteHangedFile();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
private void deleteExpiredFiles() {
int deleteCount = 0;
// fileReservedTime:文件过期时间,也就是从文件最后一次的更新时间到现在为止,如果超过该时间,则是过期文件可被删除
// fileReservedTime默认为72 hour
long fileReservedTime = DefaultMessageStore.this.getMessageStoreConfig().getFileReservedTime();
// deletePhysicFilesInterval:删除物理文件的时间间隔,在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除,
// 因此删除一个文件后需要间隔deletePhysicFilesInterval这个时间再删除另外一个文件,
// 我猜测可能是由于删除文件是一个非常耗费IO的操作,会引起消息插入消费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件
// deletePhysicFilesInterval默认值为100
int deletePhysicFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteCommitLogFilesInterval();
// destroyMapedFileIntervalForcibly:在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,
// 同时将该文件标记不可用并且纪录当前时间戳destroyMapedFileIntervalForcibly这个表示文件在第一次删除拒绝后,文件保存的最大时间,
// 在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少1000,直到引用 小于等于 0为止,即可删除该文件
// destroyMapedFileIntervalForcibly 默认为 1000 * 120;
int destroyMapedFileIntervalForcibly = DefaultMessageStore.this.getMessageStoreConfig().getDestroyMapedFileIntervalForcibly();
// 是否到了时间该删除文件,默认是为凌晨4点
// private String deleteWhen = "04";
boolean timeup = this.isTimeToDelete();
// 是否因为磁盘空间占用该删除文件
boolean spacefull = this.isSpaceToDelete();
// 是否因为手动删除指令该删除文件
boolean manualDelete = this.manualDeleteFileSeveralTimes > 0;
if (timeup || spacefull || manualDelete) {
if (manualDelete)
this.manualDeleteFileSeveralTimes--;
// 判断是否需要立即清理
boolean cleanAtOnce = DefaultMessageStore.this.getMessageStoreConfig().isCleanFileForciblyEnable() && this.cleanImmediately;
// fileReservedTime切换到ms级别,原为72 hour
fileReservedTime *= 60 * 60 * 1000;
// 执行删除过期文件 deleteExpiredFile
deleteCount = DefaultMessageStore.this.commitLog.deleteExpiredFile(fileReservedTime, deletePhysicFilesInterval,
destroyMapedFileIntervalForcibly, cleanAtOnce);
if (deleteCount > 0) {
} else if (spacefull) {
log.warn("disk space will be full soon, but delete file failed.");
}
}
}
// 按照配置的when进行分割然后进行判断逻辑
public static boolean isItTimeToDo(final String when) {
String[] whiles = when.split(";");
if (whiles.length > 0) {
Calendar now = Calendar.getInstance();
for (String w : whiles) {
int nowHour = Integer.parseInt(w);
if (nowHour == now.get(Calendar.HOUR_OF_DAY)) {
return true;
}
}
}
return false;
}
// 判断时间是否到期进行删除
private boolean isTimeToDelete() {
String when = DefaultMessageStore.this.getMessageStoreConfig().getDeleteWhen();
if (UtilAll.isItTimeToDo(when)) {
DefaultMessageStore.log.info("it's time to reclaim disk space, " + when);
return true;
}
return false;
}
// 判断磁盘使用率占比进行删除
private boolean isSpaceToDelete() {
// 默认返回75%
double ratio = DefaultMessageStore.this.getMessageStoreConfig().getDiskMaxUsedSpaceRatio() / 100.0;
cleanImmediately = false;
{
// 1、获取commitLog所在的路径
String storePathPhysic = DefaultMessageStore.this.getMessageStoreConfig().getStorePathCommitLog();
// 2、计算commitLog所在分区的磁盘空间占用比率
double physicRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathPhysic);
// diskSpaceWarningLevelRatio为90%
if (physicRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("physic disk maybe full soon " + physicRatio + ", so mark disk full");
}
cleanImmediately = true;
} else if (physicRatio > diskSpaceCleanForciblyRatio) {
// diskSpaceCleanForciblyRatio为85%
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("physic disk space OK " + physicRatio + ", so mark disk ok");
}
}
if (physicRatio < 0 || physicRatio > ratio) {
DefaultMessageStore.log.info("physic disk maybe full soon, so reclaim space, " + physicRatio);
return true;
}
}
{
// 获取consumeQueue
String storePathLogics = StorePathConfigHelper
.getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
// 计算consumeQueue的磁盘占用率
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
if (logicsRatio > diskSpaceWarningLevelRatio) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
}
cleanImmediately = true;
} else if (logicsRatio > diskSpaceCleanForciblyRatio) {
cleanImmediately = true;
} else {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskOK();
if (!diskok) {
DefaultMessageStore.log.info("logics disk space OK " + logicsRatio + ", so mark disk ok");
}
}
if (logicsRatio < 0 || logicsRatio > ratio) {
DefaultMessageStore.log.info("logics disk maybe full soon, so reclaim space, " + logicsRatio);
return true;
}
}
return false;
}
}
}
- CleanCommitLogService会根据定时维度、磁盘占比、人工删除等三方面触发删除。
- 定时维度是指按照配置的时间节点触发文件删除。
- 磁盘占比是指磁盘使用率超过85%就会触发文件删除。
- 由人工触发删除,通过命令方法进行删除。
CommitLog
public class CommitLog {
protected final MappedFileQueue mappedFileQueue;
// expiredTime表示文件保留的天数
// deleteFilesInterval删除文件的间隔时间
// intervalForcibly线程被hanged的情况下强制时间
// cleanImmediately为是否立即清理,默认为true
public int deleteExpiredFile(
final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately
) {
return this.mappedFileQueue.deleteExpiredFileByTime(expiredTime, deleteFilesInterval, intervalForcibly, cleanImmediately);
}
- 执行CommitLog的deleteExpiredFile方法。
- 进而执行mappedFileQueue的deleteExpiredFileByTime方法。
MappedFileQueue
public class MappedFileQueue {
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
private Object[] copyMappedFiles(final int reservedMappedFiles) {
Object[] mfs;
if (this.mappedFiles.size() <= reservedMappedFiles) {
return null;
}
mfs = this.mappedFiles.toArray();
return mfs;
}
public int deleteExpiredFileByTime(final long expiredTime,
final int deleteFilesInterval,
final long intervalForcibly,
final boolean cleanImmediately) {
// 获取所有的mappedFiles
Object[] mfs = this.copyMappedFiles(0);
if (null == mfs)
return 0;
int mfsLength = mfs.length - 1;
int deleteCount = 0;
List<MappedFile> files = new ArrayList<MappedFile>();
if (null != mfs) {
// 遍历所有的MappedFile文件
for (int i = 0; i < mfsLength; i++) {
MappedFile mappedFile = (MappedFile) mfs[i];
// MappedFile的过期时间为日志的最后更新时间+过期时间
// 也就是说文件最后更新时间 + 72H的过期时间
long liveMaxTimestamp = mappedFile.getLastModifiedTimestamp() + expiredTime;
// 如果当前时间大于文件的过期时间 或者 需要立即更新操作
if (System.currentTimeMillis() >= liveMaxTimestamp || cleanImmediately) {
if (mappedFile.destroy(intervalForcibly)) {
files.add(mappedFile);
deleteCount++;
if (files.size() >= DELETE_FILES_BATCH_MAX) {
break;
}
// 每删除一个文件后需要等待deleteFilesInterval时间间隔
if (deleteFilesInterval > 0 && (i + 1) < mfsLength) {
try {
Thread.sleep(deleteFilesInterval);
} catch (InterruptedException e) {
}
}
} else {
break;
}
} else {
//avoid deleting files in the middle
break;
}
}
}
deleteExpiredFile(files);
return deleteCount;
}
void deleteExpiredFile(List<MappedFile> files) {
if (!files.isEmpty()) {
Iterator<MappedFile> iterator = files.iterator();
while (iterator.hasNext()) {
MappedFile cur = iterator.next();
if (!this.mappedFiles.contains(cur)) {
iterator.remove();
log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());
}
}
try {
if (!this.mappedFiles.removeAll(files)) {
log.error("deleteExpiredFile remove failed.");
}
} catch (Exception e) {
log.error("deleteExpiredFile has exception.", e);
}
}
}
}
- MappedFileQueue的deleteExpiredFileByTime负责判断mappedFile的最后更新时间+文件最大存储时间 和 当前时间进行判断,如果当前时间 > mappedFile的最后更新时间+文件最大存储时间,文件就可以进行删除。
- 文件删除包含磁盘物理文件的删除+逻辑mappedFiles的文件删除。
MappedFile
public class MappedFile extends ReferenceResource {
public boolean destroy(final long intervalForcibly) {
// intervalForcibly这个表示文件在第一次删除拒绝后,文件保存的最大时间
this.shutdown(intervalForcibly);
if (this.isCleanupOver()) {
try {
this.fileChannel.close();
long beginTime = System.currentTimeMillis();
// 文件删除
boolean result = this.file.delete();
} catch (Exception e) {
}
return true;
} else {
}
return false;
}
}
public abstract class ReferenceResource {
protected final AtomicLong refCount = new AtomicLong(1);
protected volatile boolean available = true;
protected volatile boolean cleanupOver = false;
private volatile long firstShutdownTimestamp = 0;
public void shutdown(final long intervalForcibly) {
if (this.available) {
this.available = false;
this.firstShutdownTimestamp = System.currentTimeMillis();
this.release();
} else if (this.getRefCount() > 0) {
if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
this.refCount.set(-1000 - this.getRefCount());
this.release();
}
}
}
public void release() {
// 只有当refCount 小于0的情况才能执行release
long value = this.refCount.decrementAndGet();
if (value > 0)
return;
synchronized (this) {
this.cleanupOver = this.cleanup(value);
}
}
}
- MappedFile的文件删除包括fileChannel的close和file的delete删除。
CleanConsumeQueueService
public class DefaultMessageStore implements MessageStore {
class CleanConsumeQueueService {
private long lastPhysicalMinOffset = 0;
public void run() {
try {
this.deleteExpiredFiles();
} catch (Throwable e) {
DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);
}
}
private void deleteExpiredFiles() {
int deleteLogicsFilesInterval = DefaultMessageStore.this.getMessageStoreConfig().getDeleteConsumeQueueFilesInterval();
// 获取commitLog的最小的offset
long minOffset = DefaultMessageStore.this.commitLog.getMinOffset();
if (minOffset > this.lastPhysicalMinOffset) {
this.lastPhysicalMinOffset = minOffset;
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;
// 遍历所有的consumeQueueTable逐个进行删除
for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {
for (ConsumeQueue logic : maps.values()) {
// 针对单个ConsumeQueue依据minOffset进行删除
int deleteCount = logic.deleteExpiredFile(minOffset);
if (deleteCount > 0 && deleteLogicsFilesInterval > 0) {
try {
Thread.sleep(deleteLogicsFilesInterval);
} catch (InterruptedException ignored) {
}
}
}
}
DefaultMessageStore.this.indexService.deleteExpiredFile(minOffset);
}
}
}
}
- CleanConsumeQueueService通过deleteExpiredFiles执行consumeQueue进行删除。
- deleteExpiredFiles会获取最早的commitLog的物理偏移量lastPhysicalMinOffset,然后根据lastPhysicalMinOffset去遍历所有的consumeQueueTable挨个进行删除。
- 最终会根据lastPhysicalMinOffset去执行consumeQueue的deleteExpiredFile。
- consumeQueue底层实际上是mappedFileQueue的文件实现。
ConsumeQueue
public class ConsumeQueue {
public int deleteExpiredFile(long offset) {
int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
this.correctMinOffset(offset);
return cnt;
}
}
- 执行mappedFileQueue的deleteExpiredFileByOffset。
MappedFileQueue
public class MappedFileQueue {
public int deleteExpiredFileByOffset(long offset, int unitSize) {
Object[] mfs = this.copyMappedFiles(0);
List<MappedFile> files = new ArrayList<MappedFile>();
int deleteCount = 0;
if (null != mfs) {
int mfsLength = mfs.length - 1;
// 遍历所有的consumeQueue的MappedFile
for (int i = 0; i < mfsLength; i++) {
boolean destroy;
MappedFile mappedFile = (MappedFile) mfs[i];
// 获取consumeQueue的MappedFile的最新的20个字节
// 比较该consumeQueue的最新的逻辑位移和最小待删除的物理偏移
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(this.mappedFileSize - unitSize);
if (result != null) {
long maxOffsetInLogicQueue = result.getByteBuffer().getLong();
result.release();
// 如果最大的maxOffsetInLogicQueue还是小于最小偏移量就可以进行删除
destroy = maxOffsetInLogicQueue < offset;
} else if (!mappedFile.isAvailable()) { // Handle hanged file.
destroy = true;
} else {
break;
}
// 针对需要删除的文件,执行mappedFile.destroy
if (destroy && mappedFile.destroy(1000 * 60)) {
files.add(mappedFile);
deleteCount++;
} else {
break;
}
}
}
// 逻辑删除mappedFile
deleteExpiredFile(files);
return deleteCount;
}
}
- MappedFileQueue的deleteExpiredFileByOffset的核心逻辑在于比较MappedFile的最新的偏移量和待删除的commitLog最小的物理偏移量进行比较。
- 如果consumeQueue的MappedFileQueue的最大逻辑位移小于commitLog的最小位移,那么就可以删除该consumeQueue文件。
网友评论