美文网首页
RocketMQ消息引擎关于CommitLog浅析

RocketMQ消息引擎关于CommitLog浅析

作者: 丑人林宗己 | 来源:发表于2019-06-23 22:53 被阅读0次

前文有简单的提到RocketMQ的底层文件存储模型,基于该存储模型之上再简单的探索一下 CommitLog的一个底层设计,思考RocketMQ如何做到高性能?

对于RoceketMQ而言,所有的消息最终都需要被持久化到CommitLog文件中。

image.png

如上图所示,可以很粗浅的理解为,CommitLog描述的是整个CommitLog目录,而MappedFileQueue描述的则是CommitLog File数组容器,而MappedFile描述一个CommitLog File

CommitLog

// commitlog构造器
public CommitLog(final DefaultMessageStore defaultMessageStore) {
    this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
        defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());// 创建mapperFileQueue
    this.defaultMessageStore = defaultMessageStore;
    // 刷盘对象线程
    if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        this.flushCommitLogService = new GroupCommitService();
    } else {
        this.flushCommitLogService = new FlushRealTimeService();
    }
    // 提交
    this.commitLogService = new CommitRealTimeService();
    // append消息回调(描述的是将消息在文件末尾不断的append上去)
    this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
    batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
        @Override
        protected MessageExtBatchEncoder initialValue() {
            return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
        }
    };
    // 消息写入锁
    this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
}

MappedFileQueue#load()

加载CommitLog目录下的文件


public boolean load() {
    File dir = new File(this.storePath);
    File[] files = dir.listFiles();
    if (files != null) {
        // ascending order
        Arrays.sort(files);
        for (File file : files) {

            if (file.length() != this.mappedFileSize) {// mappedFileSize默认1G
                log.warn(file + "\t" + file.length()
                    + " length not matched message store config value, please check it manually");
                return false;// 即只加载大小为1G的文件
            }

            try {
                MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);

                mappedFile.setWrotePosition(this.mappedFileSize);// 已写位置
                mappedFile.setFlushedPosition(this.mappedFileSize);// 设置已刷盘位置
                mappedFile.setCommittedPosition(this.mappedFileSize);//设置已提交位置
                this.mappedFiles.add(mappedFile);
                log.info("load " + file.getPath() + " OK");
            } catch (IOException e) {
                log.error("load file " + file + " error", e);
                return false;
            }
        }
    }

    return true;
}

初始化加载数据时,只加载了1G文件,低于1G的文件不做加载处理,那么当要写入时一定要找到一个最新的文件,或者新建一个文件-->MappedFileQueue#getLastMappedFile()

String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
    + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;

if (this.allocateMappedFileService != null) {
    mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
        nextNextFilePath, this.mappedFileSize);;
} else {
    try {
        mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
    } catch (IOException e) {
        log.error("create mappedFile exception", e);
    }
}

由代码可知,创建MappedFile时,传入的参数包括下一个文件路径,以及下下个文件路径,而创建的程序则如AllocateMappedFileService#putRequestAndReturnMappedFile()

创建的方式也特别有意思,通过将创建文件的参数封装为一个AllocateRequest对象并放入阻塞队列中,通过另外线程不断从队列中取出请求并完成创建。

不仅创建了当前的文件,还可以把下一个文件创建好,达到异步预创建的目的,减少了创建文件时的时间,进而可以提供系统的吞吐量。

不仅如此,除了预创建Commitlog File之外,从源码mmapOperation()方法中可以看到一个方法叫做MappedFile#warmMappedFile(),字面理解为预热。为什么要做文件预热呢?自然是为了提高读写性能,提升系统的吞吐量,个人认为消息队列最核心的问题应该是消息的堆积能力系统吞吐量,当然前提是抛开高可用等问题,毕竟本身消息队列的作为异步解耦,削峰填谷的核心诉求注定了业务上允许一定的时延。

为了提高IO读写的性能,RocketMQ都做了什么?

MappedFile#init()


private void init(final String fileName, final int fileSize) throws IOException {
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;

    ensureDirOK(this.file.getParent());

    try {
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);// 内存映射手段
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    } catch (FileNotFoundException e) {
        log.error("create file channel " + this.fileName + " Failed. ", e);
        throw e;
    } catch (IOException e) {
        log.error("map file " + this.fileName + " Failed. ", e);
        throw e;
    } finally {
        if (!ok && this.fileChannel != null) {
            this.fileChannel.close();
        }
    }
}

内存映射可以在一定条件下提高IO读写效率,但是不见得是必备良药。之前在网上看过一篇文章对Java的击中IO操作api进行可对比:传送门

内存映射本质上是通过将进程使用的虚拟内存地址映射到物理地址上,以此提高IO的读写。直接对磁盘IO的读写性能非常差,比起内存的读写简直是差之千里,而内存映射可以让IO的读写近乎对内存的读写。比肩内存的读写要求是数据必须命中pageCache,那么在pageCache层面,RocketMQ由做了什么优化呢?查阅源码可以确定的时,RocketMQ使用了两个较为底层的方法mlockmadvise。这两个方法的目的是要做什么?锁住内存,以及内存预热。

内存锁定

linux系统为了优化IO读写的效率与速度,引入了一种内存机制(物理内存),即数据从磁盘到内存的复制过程由内核实现,而实现的基础则是pageCache,pageCache的大小默认是4kb。关于pageCache的内容很多,笔者对此也了解较浅,故不做赘述,后续深入了解后再补充。

物理内存是有操作系统级别控制,当运行的Java 进程结束后,物理内存也不会理解释放,该问题进一步导致在Linux系统中程序频繁读写文件后,可用物理内存变少。当系统的物理内存不够用的时,就需要将物理内存中的一部分空间释放出来,以供当前运行的程序使用。那些被释放的空间可能来自一些很长时间没有什么操作的程序,这些被释放的空间被临时保存到Swap空间中,等到那些程序要运行时,再从Swap分区中恢复保存的数据到内存中。这样,系统总是在物理内存不够时,才进行Swap交换。为了减少系统级别的Swap交换,RocketMQ通过使用mlock来锁定内存。

mlock的作用如下:

  • 被锁定的物理内存在被解锁或进程退出前,不会被页回收流程处理。
  • 被锁定的物理内存,不会被交换到swap分区设备。
  • 进程执行mlock操作时,内核会立刻分配物理内存(注意COW的情况)。

内存预热

日常中使用缓存来解决系统的性能问题,减少对底层数据库的直接读写,降低数据库的读压力,这个过程在操作系统IO读写亦是同样的道理。pageCache可以理解为系统缓存,而内存预热的目的则是建议操作系统预先将文件内容加载至pageCache,当读取数据时会优先判断是否命中pageCache,如果无法命中则会抛出一次缺页中断,直接从磁盘读取,一次降低了IO吞吐量。

madivse函数的意义是建议操作系统加载数据至pageCache中,方法参数:int madvise(void *addr 、长度 size_t , int 建议),如下提供两个常见的建议

  • madv_willneed
    预计在不久的将来访问(因此,可能最好已阅读一些页面 .)
  • madv_dontneed
    不要期待在不久的将来访问(用的时间.用给定的范围后,使内核可以释放与它关联的资源.)在此范围内的页的后续访问都将成功,但从基础会在重新装入存储器内容的映射文件(看到mmap(2))在没有基本映射的页面请求或零填充。

RocketMQ在创建文件时正是使用了madv_willneed,由于文件创建的方式由异步线程完成,故而内存预热对于当前的IO读写影响不大。

MappedFile#appendMessagesInner()

RocketMQ提供的刷盘方式有两种,一种是同步刷盘,一种是异步刷盘,同步刷盘号称数据不可能丢失,果真如此吗?

从源码上看,消息的写入首先是写入到进程内存中

ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();

然后再通过异步的线程实现刷盘,这种方式其实还是存在一定程度可能出现数据丢失的情况。

前文看过一段程序,在Commitlog初始化时


if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
    this.flushCommitLogService = new GroupCommitService();  // 同步刷盘
} else {
    this.flushCommitLogService = new FlushRealTimeService(); // 异步刷盘
}

GroupCommitService

这个类有个比较精妙的设计,即设计了一对读写GroupCommitRequest队列。

读是相对于刷盘实例,即this对象,而写则相对于系统的刷盘请求写入。这么设计有什么好处呢?实现了读写的分离,当系统发起刷盘请求时不会影响系统继续写入刷盘请求(刷盘是阻塞操作),并且在完成一次刷盘之后即可进行读写队列互换身份(加了同步锁),继续读写。

while (!this.isStopped()) {
    try {
        this.waitForRunning(10);
        this.doCommit();
    } catch (Exception e) {
        CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
    }
}

每10s换一次互换一次队列,并且每10ms进行一次刷盘,那么在这10ms内如果发现了宕机,无疑会丢失一部分数据。

FlushRealTimeService

该类译名是实时刷盘,但是果真如此吗?

// 是否实时刷盘?默认是false
boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
// 时间间隔,默认500,即500ms
int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
// 默认刷盘页数,默认4
int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
// 物理队列刷盘吞吐时间间隔,默认10s
int flushPhysicQueueThoroughInterval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();


// Print flush progress
long currentTimeMillis = System.currentTimeMillis();
// 如果当前系统时间大于上次刷盘时间+物理队列刷盘吞吐时间间隔
// 理论上来说,默认时间500ms,即currentTimeMillis+500ms不太可能大于上次currentTimeMillis+10s
// 刷盘是阻塞的,如果一次刷盘时间过程,则会将刷盘的页数改为0
if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
    this.lastFlushTimestamp = currentTimeMillis;
    flushPhysicQueueLeastPages = 0;
    printFlushProgress = (printTimes++ % 10) == 0;
}

// 刷盘页数这个值得意义何在呢?
private boolean isAbleToFlush(final int flushLeastPages) {
    int flush = this.flushedPosition.get();// 上次刷盘的位置,offset
    int write = getReadPosition();// 当前写入位置,offset

    if (this.isFull()) {
        return true;
    }

    if (flushLeastPages > 0) { 
// (write - flush)/ OS_PAGE_SIZE 表示上次刷盘到现在写入的字节数除以系统默认页面大小,即4kb
        return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
    }

    return write > flush;// 如果上次写入大于上次刷盘,则允许刷盘
}

MappedFile#flush()

// 文件的写入可以选择使用
//We only append data to fileChannel or mappedByteBuffer, never both.
if (writeBuffer != null || this.fileChannel.position() != 0) {
    this.fileChannel.force(false);
} else {
    this.mappedByteBuffer.force();
}

写入到底是用FileChannel还是MMap好呢?看前文提到的一篇博客,而RocketMQ提供了一种可选择性。

看``

this.transientStorePool = new TransientStorePool(messageStoreConfig);
if (messageStoreConfig.isTransientStorePoolEnable()) {// 开启了该值才可以进行初始化
            this.transientStorePool.init();
}

// 初始化
/**
 * It's a heavy init method.
 */
public void init() {
    for (int i = 0; i < poolSize; i++) {
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);

        final long address = ((DirectBuffer) byteBuffer).address();// 使用的是堆外内存
        Pointer pointer = new Pointer(address);
        LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));

        availableBuffers.offer(byteBuffer);
    }
}
// 过没有初始化,writeBuffer则为空
public ByteBuffer borrowBuffer() {
    ByteBuffer buffer = availableBuffers.pollFirst();
    if (availableBuffers.size() < poolSize * 0.4) {
        log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
    }
    return buffer;
}

这里用到一个DirectBuffer,可以称之为堆外内存,亦可以理解为不收JVM管控的内存区域。以正确的姿势使用堆外内存可以在提高IO的读写效率。

为何?从堆内堆外的角度来思考一个文件读取的过程,比如:要完成一个从文件中读数据到堆内内存的操作,完成这个操作通常有2种方法,一种即FileChannelImpl.read()。这里实际上File I/O会将数据读到堆外内存中,然后堆外内存再将数据拷贝到堆内内存。

但是堆外内存的创建很重,故而RocketMQ将堆外内存进行了池化,以此达到复用的效果,默认是读取5块堆外内存,即5G内容。

是否要使用该方案?

/**
 * Enable transient commitLog store pool only if transientStorePoolEnable is true and the FlushDiskType is
 * ASYNC_FLUSH
 *
 * @return <tt>true</tt> or <tt>false</tt>
 */
public boolean isTransientStorePoolEnable() {
    return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
        && BrokerRole.SLAVE != getBrokerRole();
}

使用FileChannel写操作其实是操作的堆外内存。

总结

在IO读写操作上,RocketMQ的一些优化方案的关键词包括:

  • 异步创建文件
  • 内存锁定
  • 内存预热
  • 堆外内存
    当然,具体怎么使用,怎么配置还是要业务,但是不可否认的是RocketMQ的设计确实很精妙

相关文章

网友评论

      本文标题:RocketMQ消息引擎关于CommitLog浅析

      本文链接:https://www.haomeiwen.com/subject/lwcbqctx.html