RocketMQ存储文件与内存映射

作者: 93张先生 | 来源:发表于2020-12-03 20:29 被阅读0次

    概览

    RocketMQ的消息存储主要是在${ROCKETMQ_HOME}/store文件夹下,message消息主要存储在commitlog文件夹下,RocketMQ消息存储和索引是分开隔离的,已Topic为主题的消息索引存储在consumequeue文件夹下,通过MessageQueue映射为ConsumeQueue的文件就存储在这个文件夹下,然后index主要是以消息key和offset的对应关系,以类似HashMap的方式存储,方便消息查询。

    image.png

    本片文章主要介绍消息存储组织结构、Message是如何快速存储都MappedFile文件中的。MappedFile文件就是一个个以首条消息的offset为名称的存储文件,如上图commitlog文件夹下展示的00000000000000000000、00000000001073741824等,每一个mappedFile文件的大小约为102410241024=1G。

    image.png

    DefaultMessageStore

    DefaultMessageStore是消息相关操作的主要服务,包括消息的存储、查询、定时清除等等。这里主要介绍其中消息存储相关的事物,包括是否开启TransientStorePool临时消息存储池,一次创建2个MappedFile文件的AllocateMappedFileService消息存储预创建服务,还有历史存储文件mappedFile加载加载到直接内存MappedByteBuffer和对应的mmap文件映射等。

    # org.apache.rocketmq.store.DefaultMessageStore
    
    // MappedFile 分配服务
    private final AllocateMappedFileService allocateMappedFileService;
    
    // 是否开启
    // 消息临时存储
    private final TransientStorePool transientStorePool;
    
    this.transientStorePool = new TransientStorePool(messageStoreConfig);
    // 根据是否开启 transientStorePoolEnable,存在两种初始化情况。
    // transientStorePoolEnable 为 true 表示内容先存储在堆外内存(直接内存),然后通过 Commit 线程将数据提交到FileChannel中,再通过 Flush 线程将内存映射 Buffer 中的数据持久化到磁盘中。
    if (messageStoreConfig.isTransientStorePoolEnable()) {
        this.transientStorePool.init();
    }
    //加载历史mappedFile文件,进行便于文件查询和消费
    // load Commit Log
    result = result && this.commitLog.load();
    

    TransientStorePool

    TransientStorePool是短暂的消息存储池。这里先进行简单介绍,具体作用到应用的时候详细介绍。这里直接开辟默认5个1G的直接内存ByteBuffer,用来临时存储消息。它还引入了内存锁的机制,避免直接内存的数据被替换到系统中的Swap分区中,提高系统存储性能,使RocketMQ消息低延迟、高吞吐量。

    public class TransientStorePool {
        // availableBuffers 个数,可通过在broker中配置文件中设置 transientStorePool,默认值为 5
        private final int poolSize;
        // 每个 ByteBuffer 大小,默认为 mappedFileSizeCommitLog,表明 TransientStorePool 为 commitlog 文件服务
        private final int fileSize;
        // 直接内存,ByteBuffer 容器,双端队列
        private final Deque<ByteBuffer> availableBuffers;
    
        /**
         * 创建默认的堆外内存
         * It's a heavy init method.
         */
        public void init() {
            for (int i = 0; i < poolSize; i++) {
                // 利用 NIO 直接直接分配,堆外内存(直接内存),在系统中的内存,非 JVM 内存
                ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
                // 内存地址
                final long address = ((DirectBuffer) byteBuffer).address();
                Pointer pointer = new Pointer(address);
                // 内存锁定
                LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
    
                availableBuffers.offer(byteBuffer);
            }
        }
    }   
    

    CommitLog

    CommitLog主要有消息的刷盘的存储服务、消息的刷盘服务,存储消息的回调等等,这里主要介绍MappedFileQueue,它是对${ROCKET_HOME}/store/commitlog目录的封装,主要用来管理多个MappedFile。

    public class CommitLog {
        // 映射文件队列,ROCKETMQ_HOME/commitlog 文件夹下的文件对应
        protected final MappedFileQueue mappedFileQueue;
        // 默认消息存储服务
        protected final DefaultMessageStore defaultMessageStore;
        // commitLog 刷盘操作
        private final FlushCommitLogService flushCommitLogService;
    
        //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
        // 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交条内存映射 MappedByteBuffer 中
        private final FlushCommitLogService commitLogService;
        // 存储消息到 mappedFile 的回调映射
        private final AppendMessageCallback appendMessageCallback;
        // 消息解码服务线程
        private final ThreadLocal<MessageExtBatchEncoder> batchEncoderThreadLocal;
    
        // topic-queue-id,offset;消息的key,和在 commitlog 中的 offset,方便消息存储时的索引
        protected HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable = new HashMap<String, Long>(1024);
    
        public CommitLog(final DefaultMessageStore defaultMessageStore) {
            // 在这里组织 commitlog 的对应的 MappedFile 文件,然后进行相应的文件操作,文件映射,刷线到磁盘文件
            this.mappedFileQueue = new MappedFileQueue(defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog(),
                defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(), defaultMessageStore.getAllocateMappedFileService());
    
            this.defaultMessageStore = defaultMessageStore;
            // 异步、同步刷盘服务初始化
            if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                // 同步刷盘服务为 GroupCommitService
                this.flushCommitLogService = new GroupCommitService();
            } else {
                // 异步刷盘服务为 FlushRealTimeService
                this.flushCommitLogService = new FlushRealTimeService();
            }
            // 定时将 transientStorePoll 中的直接内存 ByteBuffer,提交条内存映射 MappedByteBuffer 中
            this.commitLogService = new CommitRealTimeService();
            // 存储消息到 mappedFile 的回调映射
            this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
            batchEncoderThreadLocal = new ThreadLocal<MessageExtBatchEncoder>() {
                @Override
                protected MessageExtBatchEncoder initialValue() {
                    return new MessageExtBatchEncoder(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
                }
            };
            // putMessage 到 mappedFile 时是否使用可重入锁,默认使用自旋锁
            this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
    
        }
    }
    

    MappedFileQueue

    MappedFile 的存储集合和管理器,是对 ${ROCKET_HOME}/store/commitlog 文件夹的封装。主要用来管理MappedFile文件,包括消息的查询、提交、落盘的刷新,历史MappedFile文件的预热加载和直接内存映射mmap操作,过期文件的删除、追加消息的最后一个MappedFile文件的获取和创建等。

    public class MappedFileQueue {
        // 存储路径${ROCKET_HOME}/store/commitlog,该目录下会存在多个内存映射文件
        private final String storePath;
        // 单个文件的存储大小
        private final int mappedFileSize;
        // mappedFile 文件集合
        // 一个线程安全的 ArrayList 的变种,通过可 reentrantLock 可重入锁实现数组的新建和数组旧有内容的 copy 到新建的数组,然后返回新建的数组
        private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
        // 创建 MappedFile 服务类
        private final AllocateMappedFileService allocateMappedFileService;
        // 当前刷盘指针,表示该指针之前的所有数据全部持久化到磁盘
        // MappedFile 中的 MappedByteBuffer 中数据写入磁盘的指针,该指针之前的所有数据全部持久化到磁盘
        private long flushedWhere = 0;
    
        // Java 应用程序态数据要写入nio内存映射的ByteBuffer的提交了位置的指针
        // commitWhere 只有开启 transientStorePool 的前提下才有作用;
        // commitWhere 代表着 transientStorePool 中直接内存 ByteBuffer 需要提交数据到 MappedByteBuffer 直接内存的,位置为已经提交了数据的位置。下次要提交的开始位置,上次提交的结尾位置。
        private long committedWhere = 0;
    
    
        /**
         * 项目启动,加载 commitlog 文件夹下对应的文件
         * @return
         */
        public boolean load() {
            File dir = new File(this.storePath);
            File[] files = dir.listFiles();
            if (files != null) {
                // ascending order
                // 根据文件名(offset)排序
                Arrays.sort(files);
                for (File file : files) {
                    // 如果物理文件大小 != mappedFileSize,说明文件被破坏了,返回false
                    if (file.length() != this.mappedFileSize) {
                        log.warn(file + "\t" + file.length()
                            + " length not matched message store config value, please check it manually");
                        return false;
                    }
    
                    try {
                        MappedFile mappedFile = new MappedFile(file.getPath(), mappedFileSize);
                        // 更新 mappedFile 文件指针
                        mappedFile.setWrotePosition(this.mappedFileSize);
                        mappedFile.setFlushedPosition(this.mappedFileSize);
                        mappedFile.setCommittedPosition(this.mappedFileSize);
                        // 加入映射文件集合
                        this.mappedFiles.add(mappedFile);
                        log.info("load " + file.getPath() + " OK");
                    } catch (IOException e) {
                        log.error("load file " + file + " error", e);
                        return false;
                    }
                }
            }
    
            return true;
        }
    
        /**
         * 获取最后存储消息的映射mappedFile
         *
         * @param startOffset mappedFile 开始写文件的offset
         * @param needCreate 是否需要创建新的 mappedFile 文件
         * @return
         */
        //
        public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
            // 创建映射文件的起始偏移量,也就是即将的mappedfile文件名称
            long createOffset = -1;
            // 获取最后一个映射文件,如果为null或者写满则会执行创建逻辑
            MappedFile mappedFileLast = getLastMappedFile();
            // mappedFileLast == null,表示需要创建新的 mappedFile 文件,创建新文件的offset值;
            if (mappedFileLast == null) {
                // 计算将要创建的映射文件的起始偏移量
                // 如果startOffset<=mappedFileSize则起始偏移量为0
                // 如果startOffset>mappedFileSize则起始偏移量为是mappedFileSize的倍数
                createOffset = startOffset - (startOffset % this.mappedFileSize);
            }
            // 映射文件满了,创建新的映射文件
            if (mappedFileLast != null && mappedFileLast.isFull()) {
                // 创建的映射文件的偏移量等于最后一个映射文件的起始偏移量  + 映射文件的大小(commitlog文件大小)
                createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
            }
            // 创建新的映射文件
            if (createOffset != -1 && needCreate) {
                // 构造commitlog 文件名称
                String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
                // nextNextFilePath,预先创建下一个 mappedFile 文件,通过 allocateMappedFileService 服务,一起创建两个文件,预先创建下一个文件
                String nextNextFilePath = this.storePath + File.separator
                    + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
                MappedFile mappedFile = null;
                // 优先通过allocateMappedFileService中方式构建映射文件,预分配方式,性能高
                // 如果上述方式失败则通过new创建映射文件
                if (this.allocateMappedFileService != null) {
                    mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                        nextNextFilePath, this.mappedFileSize);
                } else {
                    try {
                        mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                    } catch (IOException e) {
                        log.error("create mappedFile exception", e);
                    }
                }
    
                if (mappedFile != null) {
                    // 是否是 MappedFileQueue 队列中第一个文件
                    if (this.mappedFiles.isEmpty()) {
                        mappedFile.setFirstCreateInQueue(true);
                    }
                    this.mappedFiles.add(mappedFile);
                }
    
                return mappedFile;
            }
    
            return mappedFileLast;
        }
    }
    

    MappedFile

    CommitLog、MappedFileQueue、MappedFile的关系如图:


    image.png

    MappedFile是RocketMQ消息存储的终极Boss,重中之重。涉及MapedFile的预创建和映射、历史数据MappedFile的磁盘文件预热。MappedByteBuffer是通过NIO方式创建的内存映射对象。ByteBuffer writeBuffer是直接内存从TransientStorePool中借来的,他们两个是在内存中用来存放消息的,其中区别下面详细介绍。这里先从CommitLog文件存放消息说起。

    public class MappedFile extends ReferenceResource {
        // 当前JVM实例中 MappedFile 虚拟内存
        private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
        // 当前JVM实例中MappedFile对象个数
        private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
        // 即将写入消息的mappedFile 的位置
        // 当前 MappedFile 文件的写指针,从 0 开始(内存映射文件的写指针)
        protected final AtomicInteger wrotePosition = new AtomicInteger(0);
        // 当前文件的提交到 MappedBuffer的指针,如果开启 transientStorePoolEnable,则数据会存储在 TransientStorePool 中,然后提交到内存映射 ByteBuffer 中,再刷写到磁盘
        protected final AtomicInteger committedPosition = new AtomicInteger(0);
        // 刷写到磁盘指针,该指针之前的数据持久化到磁盘中
        private final AtomicInteger flushedPosition = new AtomicInteger(0);
        // 文件大小
        protected int fileSize;
        // 文件通道
        protected FileChannel fileChannel;
        /**
         * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
         */
        // 堆外内存 ByteBuffer,如果不为空,数据首先将存储在该 Buffer 中,然后提交到 MappedFile 对应的内存映射文件 Buffer。
        // transientStorePoolEnable 为true时不为空。
        protected ByteBuffer writeBuffer = null;
        // 堆内存池,transientStorePoolEnable 为true 时启用
        protected TransientStorePool transientStorePool = null;
        // 文件名称
        private String fileName;
        // mappedFile 文件的开始偏移量地址
        private long fileFromOffset;
        // 物理文件
        private File file;
        // NIO 物理文件对应的内存映射Buffer
        private MappedByteBuffer mappedByteBuffer;
        // 文件最后一次内容写入的时间
        private volatile long storeTimestamp = 0;
        // 是否是 MappedFileQueue 队列中第一个文件
        private boolean firstCreateInQueue = false;
    
    }
    

    CommitLog#putMessage方法是来存放消息的,存放消息到系统内存映射中,并没有落入磁盘中,等待同步刷盘、或者异步刷盘,然后是消息存储的高可用。

    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        // Set the storage time
        // 消息存储时间
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        // 循环冗余校验码,检测数据在网络中传输是否发生变化
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;
        // 存储服务统计功能服务
        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
        // topic
        String topic = msg.getTopic();
        int queueId = msg.getQueueId();
        // 事务回滚消息标志
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        // 如果是非事务消息,或者事务消息的 commit 操作;进而判断是不是延迟消息,存储到特殊的延迟消息队列;然后事务消息存储也进行了同样的消息 topic 的转换,从而实现了消息的事务;事务消息非提交阶段,先进行另一个 topic 的储存,如果事务提交了,才进行,存储到消息的真正的topic 中去。
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
            // Delay Delivery
            // 如果是延迟级别消息
            if (msg.getDelayTimeLevel() > 0) {
                // 设置消息延迟级别
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                }
                // 延迟消息topic
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                // 延迟消息消息队列Id
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
                // Backup real topic, queueId
                // 将真实的 topic 放入 message 属性中
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                // 替换为延迟消息topic
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            }
        }
        // 消息诞生地址 ipv6 设置
        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setBornHostV6Flag();
        }
        // 消息存储地址 ipv6 设置
        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) {
            msg.setStoreHostAddressV6Flag();
        }
    
        long elapsedTimeInLock = 0;
    
        MappedFile unlockMappedFile = null;
        // 最后一个 消息 存储 commitlog 消息映射文件
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
        // 自旋锁 或 可重入锁,上锁;消息写入 commitlog 的映射文件是串行的
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            // 开始上锁时间
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;
    
            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            // 消息存储时间,确定消息全局有序
            msg.setStoreTimestamp(beginLockTimestamp);
            //mappedFile==null标识CommitLog文件还未创建,第一次存消息则创建CommitLog文件
            //mappedFile.isFull()表示mappedFile文件已满,需要重新创建CommitLog文件
            if (null == mappedFile || mappedFile.isFull()) {
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise;新文件,造成脏数据
            }
            // mappedFile==null说明创建CommitLog文件失败抛出异常,创建失败可能是磁盘空间不足或者权限不够
            if (null == mappedFile) {
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
            }
            // 追加消息到 mappedFile 文件中
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                // 针对一条消息足够长,然后 mappedFile 文件不够存储,需要创建新的 mappedFile 进行消息存放。
                case END_OF_FILE:
                    // 上一个 mappedFile 暂存文件,需要解锁这个 mappedFile
                    unlockMappedFile = mappedFile;
                    // broker 重新开辟,新的 commitlog 文件
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                    }
                    // 开辟成功,再将消息写入
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
                default:
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            }
            // 存储消息花费时间
            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
        } finally {
            // 最后释放存储消息的锁
            putMessageLock.unlock();
        }
        // 存储消息花费时间 > 500
        if (elapsedTimeInLock > 500) {
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
        }
        // 上一个有空闲空间,但不够存储消息的 mappedFile 文件,
        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
            // 解锁 mappedFile 的内存锁定
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        }
    
        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    
        // Statistics
        // topic 下存放消息次数
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        // topic 下存放消息字节数
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
        // handle 硬盘刷新
        handleDiskFlush(result, putMessageResult, msg);
        // handle 高可用
        handleHA(result, putMessageResult, msg);
        // 返回存储消息的结果
        return putMessageResult;
    }
    
    PutMessage重要步骤
    1. 获取上次最后一个写入消息的存储文件MappedFile,MappedFile文件的获取在后面会详细接受。
    // 最后一个 消息 存储 commitlog 消息映射文件
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    
    1. 向MappedFile文件追加消息,如果返回END_OF_FILE代表这个整备追加消息的MappedFile文件写满了,不够存储本条消息,然后再去获取这最后下一个MappedFile文件。
    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
    // 针对一条消息足够长,然后 mappedFile 文件不够存储,需要创建新的 mappedFile 进行消息存放。
    case END_OF_FILE:
        // 上一个 mappedFile 暂存文件,需要解锁这个 mappedFile
        unlockMappedFile = mappedFile;
        // broker 重新开辟,新的 commitlog 文件
        // Create a new file, re-write the message
        mappedFile = this.mappedFileQueue.getLastMappedFile(0);
        if (null == mappedFile) {
            // XXX: warn and notify me
            log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            beginTimeInLock = 0;
            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
        }
        // 开辟成功,再将消息写入
        result = mappedFile.appendMessage(msg, this.appendMessageCallback);
    
    1. 写MappedFile文件是会被mlock内存锁定,防止被交换到Swap分区中,写满的MappedFile文件进行锁定解除。
    // 上一个有空闲空间,但不够存储消息的 mappedFile 文件,
    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        // 解锁 mappedFile 的内存锁定
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }
    
    1. 写入内存的消息进行刷盘,然后是HA消息存储的高可用,Broker存储消息的复制,这两部分内容也很重要,下次在介绍,在本章不是重点内容。
    // handle 硬盘刷新
    handleDiskFlush(result, putMessageResult, msg);
    // handle 高可用
    handleHA(result, putMessageResult, msg);
    
    最后一个MappedFile的获取

    这是MappedFile设计的经典,现在重点介绍。创建MappedFile对象有两种方式。
    第一种:通过构造方法,new MappedFile()一个对象。然后进行MapepdFile对象MappedByteBuffer的内存映射。

    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
    
    public MappedFile(final String fileName, final int fileSize) throws IOException {
        init(fileName, fileSize);
    }
    /**
     * MappedFile 初始化,并做好 mappedFile 和 mappedByteBuffer 的NIO 直接内存映射关系
     *
     * @param fileName 物理文件路径
     * @param fileSize mappedFileSize 文件大小
     * @throws IOException
     */
    private void init(final String fileName, final int fileSize) throws IOException {
        this.fileName = fileName;
        this.fileSize = fileSize;
        // 物理文件名称
        this.file = new File(fileName);
        // 文件开始位置
        this.fileFromOffset = Long.parseLong(this.file.getName());
        boolean ok = false;
        // 确保文件路径存在,不存在,进行路径文件创建
        ensureDirOK(this.file.getParent());
    
        try {
            // 通过 RandomAccessFile 创建读写文件通道,并将文件内容使用NIO 的内存映射 Buffer 将文件映射到内存中
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            // 物理文件对应的内存映射Buffer
            // 通过 NIO 文件通道和mappedFileSize 大小,创建内存映射文件 mappedByteBuffer
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            // 当前JVM实例中 MappedFile 虚拟内存
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            // 当前JVM实例中MappedFile对象个数
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {
            log.error("Failed to create file " + this.fileName, e);
            throw e;
        } catch (IOException e) {
            log.error("Failed to map file " + this.fileName, e);
            throw e;
        } finally {
            if (!ok && this.fileChannel != null) {
                this.fileChannel.close();
            }
        }
    }
    

    第二种:预创建MappedFile,通过allocateMappedFileService服务一次创建两个MappedFile对象。

    // 优先通过allocateMappedFileService中方式构建映射文件,预分配方式,性能高
    // 如果上述方式失败则通过new创建映射文件
    if (this.allocateMappedFileService != null) {
        mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
            nextNextFilePath, this.mappedFileSize);
    } else {
        try {
            mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
        } catch (IOException e) {
            log.error("create mappedFile exception", e);
        }
    }
    
    
    AllocateMappedFileService

    AllocateMappedFileService是预创建MappedFile文件的服务,通过一次构造两个创建MappedFile的AllocateRequest然后放入队列requestQueue中,通过CountDownLatch线程同步协调器等待mmapOperation()方法,创建MappedFile对象,并返回。RocketMQ中预分配MappedFile的设计非常巧妙,下次获取时候直接返回就可以不用等待MappedFile创建分配所产生的时间延迟。

    CountDownLatch协调两个线程之间的通信
    image.png
    /**
     * 预先创建 MappedFile 文件,只是先创建2个创建mappedFile 文件的请求,放入队列中,具体 mappedFile 文件的创建和文件内存直接映射由 mmapOperation() 方法来实现。
     * @param nextFilePath 创建 mappedFile 文件的全路径名称
     * @param nextNextFilePath 创建下一个 mappedFile 文件的全路径名称
     * @param fileSize 文件大小
     * @return
     */
    public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
        // 默认提交两个请求
        int canSubmitRequests = 2;
        // 是否启用 transientStorePool
        if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            // SLAVE 节点中 transientByteBuffer 即使没有足够的 ByteBuffer,也不支持快速失败
            // 启动快速失败策略时,计算TransientStorePool中剩余的buffer数量减去requestQueue中待分配的数量后,剩余的buffer数量
            if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
                // 可用的 ByteBuffer - requestQueue,还剩余可用的 ByteBuffer 数量
                canSubmitRequests = this.messageStore.getTransientStorePool().availableBufferNums() - this.requestQueue.size();
            }
        }
        // 创建分配请求
        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
        // 判断requestTable中是否存在该路径的分配请求,如果存在则说明该请求已经在排队中
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
        //该路径没有在排队
        if (nextPutOK) {
            // byteBuffer 数量不够,则快速失败
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextFilePath);
                return null;
            }
            // 数量充足的话,将指定的元素插入到此优先级队列中
            boolean offerOK = this.requestQueue.offer(nextReq);
            if (!offerOK) {
                log.warn("never expected here, add a request to preallocate queue failed");
            }
            // 请求数量 -1
            canSubmitRequests--;
        }
        // 下下个请求的处理
        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
        if (nextNextPutOK) {
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextNextFilePath);
            } else {
                boolean offerOK = this.requestQueue.offer(nextNextReq);
                if (!offerOK) {
                    log.warn("never expected here, add a request to preallocate queue failed");
                }
            }
        }
        // 报错,日志
        if (hasException) {
            log.warn(this.getServiceName() + " service has exception. so return null");
            return null;
        }
        // 下一个分配请求,获取当前请求,然后通过线程协调器CountDownLatch,协调另一个线程进行完mmpOperation操作后,返回创建好的MappedFile文件
        AllocateRequest result = this.requestTable.get(nextFilePath);
        try {
            if (result != null) {
                // 默认等待5s,等待 mmapOperation 操作创建 mappedFile
                // 调用此方法的线程会被阻塞,直到 CountDownLatch 的 count 为 0;等到 mmapOperation() finally countDownLatch 为 0
                boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
                if (!waitOK) {
                    log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                    return null;
                } else {
                    // 成功从 requestTable 中移除请求,并返回 mappedFile 文件
                    this.requestTable.remove(nextFilePath);
                    return result.getMappedFile();
                }
            } else {
                log.error("find preallocate mmap failed, this never happen");
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    
        return null;
    }
    
    具体创建MappedFile对象

    AllocateMappedFileService服务开启了一个线程,不停地从创建MappedFile对象的请求队列requestQueue中获取AllocateRequest,并实时创建MappedFile对象,并通过CountDownLatch通知putRequestAndReturnMappedFile() 方法已经创建了MappedFile对象,从而获取返回。

    /**
     * 开始 mappedFile 文件分配服务,从 requestQueue 中获取创建 mappedFile 的文件请求
     */
    public void run() {
        log.info(this.getServiceName() + " service started");
        // 除非停止,否则一直在进行 mmap 映射操作
        while (!this.isStopped() && this.mmapOperation()) {
    
        }
        log.info(this.getServiceName() + " service end");
    }
    
    MmapOperation具体创建MappedFile对象

    在这里创建MappedFile对象,也有两种情况,区别在于是否启用TransientStorePool消息暂存池,它里面有默认5个1G的直接内存,可以通过直接内存赋值给MappedFile的writerBuffer对象,省去了开辟内存的时间;还有一种是通过MappedFile的NIO创建的MappedByteBuffer直接内存映射来存储消息,需要进行文件的map映射操作,开辟内存空间。这两种方式的对比会在下面介绍。

    第一种:不启用transientStorePool对象,通过构造方法创建。
    第二种:通过ServerLoader.load的方式创建,如果失败了,再去尝试构造方法的方式。

    if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
        try {
            // 为每一个 mappedFile 文件,进行init中的mmap 映射操作
            mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
            mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
        } catch (RuntimeException e) {
            log.warn("Use default implementation.");
            // spi 加载失败,使用构造方法创建 mappedFile
            mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
        }
    } else {
        // 不开启 transientStorePool,直接内存映射
        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
    }
    
    MappedFile文件预热

    通过 mmap 建立内存映射仅是将文件磁盘地址和虚拟地址通过映射对应起来,此时物理内存并没有填充磁盘文件内容。
    当实际发生文件读写时,产生缺页中断并陷入内核,然后才会将磁盘文件内容读取至物理内存。针对上述场景,RocketMQ 设计了 MappedFile 预热机制。
    当 RocketMQ 开启 MappedFile 内存预热(warmMapedFileEnable),且 MappedFile 文件映射空间大小大于等于 mapedFileSizeCommitLog(1 GB) 时,调用 warmMappedFile 方法对 MappedFile 进行预热。

    if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
        .getMappedFileSizeCommitLog()
        &&
        this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        // 对 mappedFile 进行预热
        mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
            this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
    }
    

    MappedFile创建后,需要对 MappedFile 文件进行预热,将内存和磁盘映射起来,然后每页写入占位数据0,然后将这些0数据,刷新到磁盘,进行磁盘预热。

    当调用Mmap进行内存映射后,OS只是建立了虚拟内存地址至物理地址的映射表,而实际并没有加载任何文件至内存中。
    程序要访问数据时,OS会检查该部分的分页是否已经在内存中,如果不在,则发出一次 缺页中断。X86的Linux中一个标准页面大小是4KB,
    那么1G的CommitLog需要发生 1024KB/4KB=256次 缺页中断,才能使得对应的数据完全加载至物理内存中。

    为什么每个页都需要写入数据呢?

    RocketMQ在创建并分配MappedFile的过程中预先写入了一些随机值到Mmap映射出的内存空间里。原因在于:
    仅分配内存并进行mlock系统调用后并不会为程序完全锁定这些分配的内存,原因在于其中的分页可能是写时复制的。因此,就有必要对每个内存页面中写入一个假的值。
    锁定的内存可能是写时复制的,这个时候,这个内存空间可能会改变。这个时候,写入假的临时值,这样就可以针对每一个内存分页的写入操做会强制 Linux 为当前进程分配一个独立、私有的内存页。

    写时复制
    写时复制:子进程依赖使用父进程开创的物理空间。
    内核只为新生成的子进程创建虚拟空间结构,它们来复制于父进程的虚拟究竟结构,但是不为这些段分配物理内存,它们共享父进程的物理空间,当父子进程中有更改相应段的行为发生时,再为子进程相应的段分配物理空间。
    https:www.cnblogs.com/biyeymyhjob/archive/2012/07/20/2601655.html

    为了避免OS检查分页是否在内存中的过程出现大量缺页中断,RocketMQ在做Mmap内存映射的同时进行了madvise系统调用,
    目的是使OS做一次内存映射后,使对应的文件数据尽可能多的预加载至内存中,降低缺页中断次数,从而达到内存预热的效果。
    RocketMQ通过map+madvise映射后预热机制,将磁盘中的数据尽可能多的加载到PageCache中,保证后续对ConsumeQueue和CommitLog的读取过程中,能够尽可能从内存中读取数据,提升读写性能。

    public void warmMappedFile(FlushDiskType type, int pages) {
        long beginTime = System.currentTimeMillis();
        // 创建一个新的字节缓冲区,其内容是此缓冲区内容的共享子序列
        ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
        int flush = 0;
        long time = System.currentTimeMillis();
        // warmMappedFile 每间隔 OS_PAGE_SIZE 向 mappedByteBuffer 写入一个 0,此时对应页恰好产生一个缺页中断,操作系统为对应页分配物理内存
        for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
            byteBuffer.put(i, (byte) 0);
            // 刷盘方式是同步策略时,进行刷盘操作
            // 每修改 pages 个分页刷一次盘,相当于 4096 * 4k = 16M,每 16 M刷一次盘,1G 文件 1024M/16M = 64 次
            // force flush when flush disk type is sync
            // 如果刷盘策略为同步刷盘,需要对每个页进行刷盘
            if (type == FlushDiskType.SYNC_FLUSH) {
                if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
                    flush = i;
                    mappedByteBuffer.force();
                }
            }
    
            // prevent gc
            // 防止垃圾回收
            if (j % 1000 == 0) {
                log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
                time = System.currentTimeMillis();
                try {
                    Thread.sleep(0);
                } catch (InterruptedException e) {
                    log.error("Interrupted", e);
                }
            }
        }
    
        // force flush when prepare load finished
        // 前面对每个页,写入了数据(0 占位用,防止被内存交互),进行了刷盘,然后这个操作是对所有的内存进行刷盘。
        if (type == FlushDiskType.SYNC_FLUSH) {
            log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}",
                this.getFileName(), System.currentTimeMillis() - beginTime);
            // 刷盘,强制将此缓冲区内容的任何更改写入包含映射文件的存储设备
            mappedByteBuffer.force();
        }
        log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
            System.currentTimeMillis() - beginTime);
        //通过 JNA 调用 mlock 方法锁定 mappedByteBuffer 对应的物理内存,阻止操作系统将相关的内存页调度到交换空间(swap space),以此提升后续在访问 MappedFile 时的读写性能。
        this.mlock();
    }
    

    通过 JNA 调用 mlock 方法锁定 mappedByteBuffer 对应的物理内存,阻止操作系统将相关的内存页调度到交换空间(swap space),以此提升后续在访问 MappedFile 时的读写性能。

    public void mlock() {
        final long beginTime = System.currentTimeMillis();
        final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
        Pointer pointer = new Pointer(address);
        {
            int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
            log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }
    
        // RocketMQ的做法是,在做Mmap内存映射的同时进行madvise系统调用,目的是使OS做一次内存映射后对应的文件数据尽可能多的预加载至内存中,从而达到内存预热的效果。
        {
            int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
            log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret, System.currentTimeMillis() - beginTime);
        }
    }
    

    MappedByteBuffer VS WriterBuffer

    MappedByteBuffer和WriterBuffer都是MappedFile对象的成员属性,都是用来存放消息的,只有开启了TransientStorePool,才会向writerBuffer直接内存写入消息,然后commit消息到FileChannle中,然后再flush到磁盘。否则就是存储在NIO创建的MappedByteBuffer直接内存中,然后刷新到磁盘。

    image.png

    从TransientStorePool中借取的MappedFile中的writerBuffer与MappedFile的MappedByteBuffer在数据处理上的差异在什么地方呢?

    分析其代码,TransientStorePool会通过ByteBuffer.allocateDirect调用直接申请对外内存,消息数据在写入内存的时候是写入预申请的内存中。在异步刷盘的时候,再由刷盘线程将这些内存中的修改写入文件。

    那么与直接使用MappedByteBuffer相比差别在什么地方呢?

    MappedByteBuffer 和 WriteBuffer 都会经过,PageCache 这个操作进行写入磁盘。
    MappedByteBuffer写入数据,写入的是MappedByteBuffer映射的磁盘文件对应的Page Cache,可能会慢一点。而TransientStorePool方案下写入的则为纯粹的内存,并不是PageCache,因此在消息写入操作上会更快,因此能更少的占用CommitLog.putMessageLock锁,从而能够提升消息处理量。然后再经过刷盘将直接内存中的数据经过Page Cache 写入磁盘。使用TransientStorePool方案的缺陷主要在于在异常崩溃的情况下回丢失更多的消息。

    Mmap的写入操作是:Mmap的MappedByteBuffer映射直接内存,直接内存映射文件,然后文件会对应Page Cache,也就是 MmapedByteBuffer的直接内存可能是Page Cache的东西,然后通过写Page Cache,然后再写入磁盘。

    FileChannle:是写直接内存,这个效率比较高,然后直接内存满了,在落盘的时候,再去经过Page Cache,落入磁盘。WriterBuffer的写入方式实际也就是FileChannel的写入方式,Mmap在写入4k一下的文件比较快,然后FileChannel写入文件大于4k时,比Mmap方式的要快,可能是因为PageCache 是4k,然后写着就可能去落盘了。而FileChannel 是写满了直接内存,才去经过PageCache,这样写入直接内存的效率更高,然后再经过Page Cache,当大于4k的时候,大于Page Cache的内存的时候,就是FileChannel快了。大概因为FileChannel是基于Block(块)的。

    Mmap VS FileChannle参考https://juejin.cn/post/6844903842472001550

    相关文章

      网友评论

        本文标题:RocketMQ存储文件与内存映射

        本文链接:https://www.haomeiwen.com/subject/yxlewktx.html