美文网首页
RocketMQ:消息存储机制详解与源码解析

RocketMQ:消息存储机制详解与源码解析

作者: 谁叫我土豆了 | 来源:发表于2021-12-08 17:14 被阅读0次

    文章目录

    消息存储机制

    1.前言
    ⒉.核心存储类:DefaultMessageStore
    3.消息存储流程
    4.消息存储文件
    5.存储文件内存映射
    5.1.MapperFileQueue
    5.2.MappedFile
    5.2.1.commit
    5.2.2.flush
    5.3.TransientStorePool
    6.刷盘机制
    6.1.同步刷盘
    6.2.异步刷盘

    消息存储机制

    1.前言

    本文主要讲解内容是Broker接收到消息生产者发送的消息之后,如何将消息持久化存储在Broker中。


    2.核心存储类:DefaultMessageStore

    private final MessageStoreConfig messageStoreConfig;    //消息配置属性
    private final CommitLog commitLog;      //CommitLog文件存储的实现类->消息存储在commitLog中
    private final ConcurrentMap<String/* topic */, ConcurrentMap<Integer/* queueId */, ConsumeQueue>> consumeQueueTable;    //消息队列存储缓存表,按照消息主题分组
    private final FlushConsumeQueueService flushConsumeQueueService;    //消息队列文件刷盘服务线程
    private final CleanCommitLogService cleanCommitLogService;  //过期CommitLog文件删除服务
    private final CleanConsumeQueueService cleanConsumeQueueService;    //过期ConsumerQueue队列文件删除服务
    private final IndexService indexService;    //索引服务
    private final AllocateMappedFileService allocateMappedFileService;  //MappedFile分配服务->内存映射处理commitLog、consumerQueue文件
    private final ReputMessageService reputMessageService;//CommitLog消息分发,根据CommitLog文件构建ConsumerQueue、IndexFile文件
    private final HAService haService;  //消息主从同步实现服务
    private final ScheduleMessageService scheduleMessageService;    //消息服务调度服务
    private final StoreStatsService storeStatsService;  //消息存储服务
    private final MessageArrivingListener messageArrivingListener;  //消息到达监听器
    private final TransientStorePool transientStorePool;    //消息堆外内存缓存
    private final BrokerStatsManager brokerStatsManager;    //Broker状态管理器
    private final MessageArrivingListener messageArrivingListener;  //消息拉取长轮询模式消息达到监听器
    private final BrokerConfig brokerConfig;    //Broker配置类
    private StoreCheckpoint storeCheckpoint;    //文件刷盘监测点
    private final LinkedList<CommitLogDispatcher> dispatcherList;   //CommitLog文件转发请求
    

    以上属性是消息存储的核心,需要重点关注每个属性的具体作用。

    3.消息存储流程

    消息存储时序图如下:


    消息存储入口:DefaultMessageStore#putMessage

    //检查Broker是否是Slave || 判断当前写入状态如果是正在写入,则不能继续 
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();        
    if (checkStoreStatus != PutMessageStatus.PUT_OK) {
        return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
    }
    
    //检查消息主题和消息体长度是否合法
    PutMessageStatus msgCheckStatus = this.checkMessages(messageExtBatch);
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
        return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
    }
    //记录开始写入时间
    long beginTime = this.getSystemClock().now();
    //写入消息
    CompletableFuture<PutMessageResult> resultFuture = this.commitLog.asyncPutMessages(messageExtBatch);
    
    resultFuture.thenAccept((result) -> {
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) {
            log.warn("not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, messageExtBatch.getBody().length);
        }
        //记录相关统计信息
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
        //存储失败
        if (null == result || !result.isOk()) {
            //存储状态服务->消息存储失败次数自增
            this.storeStatsService.getPutMessageFailedTimes().add(1);
        }
    });
    
    return resultFuture;
    

    DefaultMessageStore#checkStoreStatus

    //存储服务已停止
    if (this.shutdown) {
        log.warn("message store has shutdown, so putMessage is forbidden");
        return PutMessageStatus.SERVICE_NOT_AVAILABLE;
    }
    //Broker为Slave->不可写入
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("broke role is slave, so putMessage is forbidden");
        }
        return PutMessageStatus.SERVICE_NOT_AVAILABLE;
    }
    
    //不可写入->broker磁盘已满/写入逻辑队列错误/写入索引文件错误
    if (!this.runningFlags.isWriteable()) {
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) {
            log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
                "the broker's disk is full, write to logic queue error, write to index file error, etc");
        }
        return PutMessageStatus.SERVICE_NOT_AVAILABLE;
    } else {
        this.printTimes.set(0);
    }
    //操作系统页写入是否繁忙
    if (this.isOSPageCacheBusy()) {
        return PutMessageStatus.OS_PAGECACHE_BUSY;
    }
    return PutMessageStatus.PUT_OK;
    

    CommitLog#asyncPutMessages

    //记录消息存储时间
    messageExtBatch.setStoreTimestamp(System.currentTimeMillis());
    AppendMessageResult result;
    
    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
    
    final int tranType = MessageSysFlag.getTransactionValue(messageExtBatch.getSysFlag());
    
    //消息类型是否合法
    if (tranType != MessageSysFlag.TRANSACTION_NOT_TYPE) {
        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
    }
    
    //....
    
    //获取上一个MapperFile对象->内存映射的具体实现
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    
    //追加消息需要加锁->串行化处理
    putMessageLock.lock();
    try {
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        this.beginTimeInLock = beginLockTimestamp;
    
        //记录消息存储时间->保证消息的有序性
        messageExtBatch.setStoreTimestamp(beginLockTimestamp);
    
        //判断如果mappedFile如果为空或者已满,创建新的mappedFile文件
        if (null == mappedFile || mappedFile.isFull()) {
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        //如果创建失败,直接返回
        if (null == mappedFile) {
            log.error("Create mapped file1 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
            beginTimeInLock = 0;
            return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
        }
    
        //!!!写入消息到mappedFile中!!!
        result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
        //根据写入结果做不同的处理
        switch (result.getStatus()) {
            case PUT_OK:
                break;
            case END_OF_FILE:
                unlockMappedFile = mappedFile;
                // Create a new file, re-write the message
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                if (null == mappedFile) {
                    // XXX: warn and notify me
                    log.error("Create mapped file2 error, topic: {} clientAddr: {}", messageExtBatch.getTopic(), messageExtBatch.getBornHostString());
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                }
                result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
                break;
            case MESSAGE_SIZE_EXCEEDED:
            case PROPERTIES_SIZE_EXCEEDED:
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
            case UNKNOWN_ERROR:
            default:
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
        }
    
        elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        beginTimeInLock = 0;
    } finally {
        putMessageLock.unlock();
    }
    
    if (elapsedTimeInLock > 500) {
        log.warn("[NOTIFYME]putMessages in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, messageExtBatch.getBody().length, result);
    }
    
    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }
    
    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
    
    // Statistics
    storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum());
    storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());
    
    //根据刷盘策略进行刷盘
    CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
    //主从同步
    CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);
    
    

    MappedFile#appendMessagesInner

    assert messageExt != null;
    assert cb != null;
    
    //获取写指针/写入位置
    int currentPos = this.wrotePosition.get();
    
    //写指针偏移量小于文件指定大小
    if (currentPos < this.fileSize) {
        //写入缓冲区
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        //根据消息类型->批量/单个->进行不同处理
        if (messageExt instanceof MessageExtBrokerInner) {
            //单个消息
            //调用回调方法写入磁盘->CommitLog#doAppend
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBrokerInner) messageExt, putMessageContext);
        } else if (messageExt instanceof MessageExtBatch) {
            //批量消息
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos,
                    (MessageExtBatch) messageExt, putMessageContext);
        } else {
            //未知消息->返回异常结果
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        //更新写指针
        this.wrotePosition.addAndGet(result.getWroteBytes());
        //更新写入时间戳
        this.storeTimestamp = result.getStoreTimestamp();
        //返回写入结果->成功
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    

    CommitLog#doAppend

    public AppendMessageResult doAppend(final long fileFromOffset,          //文件序列偏移量
                                        final ByteBuffer byteBuffer,        //NIO字节容器
                                        final int maxBlank,                //最大可写入字节数   
                                        final MessageExtBrokerInner msgInner, //消息封装实体
                                        PutMessageContext putMessageContext) {
        //文件写入偏移量
        long wroteOffset = fileFromOffset + byteBuffer.position();
    
        //构建msgId
        Supplier<String> msgIdSupplier = () -> {
            //系统标识
            int sysflag = msgInner.getSysFlag();
            //msgId底层存储由16个字节组成
            int msgIdLen = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 + 8 : 16 + 4 + 8;
            //分配16个字节的存储空间
            ByteBuffer msgIdBuffer = ByteBuffer.allocate(msgIdLen);
            //8个字节->ip、host各占用4个字节
            MessageExt.socketAddress2ByteBuffer(msgInner.getStoreHost(), msgIdBuffer);
            //清除缓冲区->因为接下来需要翻转缓冲区
            msgIdBuffer.clear();
            //剩下的8个字节用来存储commitLog偏移量-wroteOffset
            msgIdBuffer.putLong(msgIdLen - 8, wroteOffset);
            return UtilAll.bytes2string(msgIdBuffer.array());
        };
    
        //获取当前主题消息队列唯一key
        String key = putMessageContext.getTopicQueueTableKey();
        //根据key获取消息存储偏移量
        Long queueOffset = CommitLog.this.topicQueueTable.get(key);
        if (null == queueOffset) {
            queueOffset = 0L;
            CommitLog.this.topicQueueTable.put(key, queueOffset);
        }
    
        // Transaction messages that require special handling
        final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
        switch (tranType) {
            // Prepared and Rollback message is not consumed, will not enter the
            // consumer queuec
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                queueOffset = 0L;
                break;
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            default:
                break;
        }
    
        ByteBuffer preEncodeBuffer = msgInner.getEncodedBuff();
        //计算消息存储长度
        final int msgLen = preEncodeBuffer.getInt(0);
    
        // Determines whether there is sufficient free space
        //消息是如果没有足够的存储空间则新创建CommitLog文件
        if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
            this.msgStoreItemMemory.clear();
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(maxBlank);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
            // 3 The remaining space may be any value
            // Here the length of the specially set maxBlank
            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
            return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
                    maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
                    msgIdSupplier, msgInner.getStoreTimestamp(),
                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
        }
    
        int pos = 4 + 4 + 4 + 4 + 4;
        // 6 QUEUEOFFSET
        preEncodeBuffer.putLong(pos, queueOffset);
        pos += 8;
        // 7 PHYSICALOFFSET
        preEncodeBuffer.putLong(pos, fileFromOffset + byteBuffer.position());
        int ipLen = (msgInner.getSysFlag() & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
        // 8 SYSFLAG, 9 BORNTIMESTAMP, 10 BORNHOST, 11 STORETIMESTAMP
        pos += 8 + 4 + 8 + ipLen;
        // refresh store time stamp in lock
        preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
    
    
        final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
        // Write messages to the queue buffer
        //将消息存储到byteBuffer中
        byteBuffer.put(preEncodeBuffer);
        msgInner.setEncodedBuff(null);
        //返回AppendMessageResult
        AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
            msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    
        switch (tranType) {
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                // The next update ConsumeQueue information
                CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                break;
            default:
                break;
        }
        return result;
    }
    

    AppendMessageResult

    public class AppendMessageResult {
        private AppendMessageStatus status;     //消息追加结果
        private long wroteOffset;              //消息写入偏移量    
        private int wroteBytes;                //消息待写入字节
        private String msgId;                  //消息ID   
        private Supplier<String> msgIdSupplier; //消息ID
        private long storeTimestamp;            //消息写入时间戳
        private long logicsOffset;             //消息队列偏移量
        private long pagecacheRT = 0;          //消息开始写入时间戳  
    }    
    

    返回消息写入结果,回到CommitLog#asyncPutMessages

    result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback, putMessageContext);
    switch (result.getStatus()) {
        case PUT_OK:
            break;
    }     
    //释放锁
    putMessageLock.unlock();
    //存储数据统计
    storeStatsService.getSinglePutMessageTopicTimesTotal(messageExtBatch.getTopic()).add(result.getMsgNum());
    storeStatsService.getSinglePutMessageTopicSizeTotal(messageExtBatch.getTopic()).add(result.getWroteBytes());
    
    //根据刷盘策略进行刷盘
    CompletableFuture<PutMessageStatus> flushOKFuture = submitFlushRequest(result, messageExtBatch);
    //消息主从同步
    CompletableFuture<PutMessageStatus> replicaOKFuture = submitReplicaRequest(result, messageExtBatch);
    
    

    4.消息存储文件

    • commitLog:消息存储目录
    • config:配置信息
    • consumerqueue:消息队列存储目录
    • index:消息索引文件存储目录
    • abort:Broker异常关闭时信息记录
    • checkpoint:文件监测点,存储commitlog、consumerqueue、index文件最后一次刷盘时间戳。

    5.存储文件内存映射

    RocketMQ通过使用内存映射文件提高IO访问性能,无论是CommitLog、ConsumerQueue还是IndexFile,单个文件都被设计为固定长度。

    如果一个文件写满以后再创建一个新文件,文件名就为该文件第一条消息对应的全局物理偏移量,如下图所示。


    5.1.MapperFileQueue

    //存储目录
    private final String storePath;
    
    //单个文件大小
    protected final int mappedFileSize;
    
    //MappedFile文件集合
    protected final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
    
    //映射文件MapperFile分配服务线程
    private final AllocateMappedFileService allocateMappedFileService;
    
    //刷盘指针
    protected long flushedWhere = 0;
    
    //当前数据提交指针
    private long committedWhere = 0;
    

    根据存储时间获取对应的MappedFile

    public MappedFile getMappedFileByTime(final long timestamp) {
        //拷贝映射文件
        Object[] mfs = this.copyMappedFiles(0);
    
        if (null == mfs) {
            return null;
        }
        //遍历映射文件数组
        for (int i = 0; i < mfs.length; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
            //MappedFile的最后修改时间大于指定时间戳->返回该文件
            if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
                return mappedFile;
            }
        }
    
        return (MappedFile) mfs[mfs.length - 1];
    }
    

    根据消息存储偏移量查找MappedFile

    public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
        try {
            //分别获取第一个和最后一个映射文件
            MappedFile firstMappedFile = this.getFirstMappedFile();
            MappedFile lastMappedFile = this.getLastMappedFile();
            //第一个文件和最后一个文件均不为空,则进行处理
            if (firstMappedFile != null && lastMappedFile != null) {
                if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                    LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                        offset,
                        firstMappedFile.getFileFromOffset(),
                        lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                        this.mappedFileSize,
                        this.mappedFiles.size());
                } else {
                    //获得文件索引
                    int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                    //目标映射文件
                    MappedFile targetFile = null;
                    try {
                        //根据文件索引查找目标文件
                        targetFile = this.mappedFiles.get(index);
                    } catch (Exception ignored) {
                    }
    
                    //对获取到的映射文件进行检查-判空-偏移量是否合法
                    if (targetFile != null && offset >= targetFile.getFileFromOffset()
                        && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                        return targetFile;
                    }
    
                    //继续选择映射文件
                    for (MappedFile tmpMappedFile : this.mappedFiles) {
                        if (offset >= tmpMappedFile.getFileFromOffset()
                            && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                            return tmpMappedFile;
                        }
                    }
                }
    
                //返回第一个映射文件
                if (returnFirstOnNotFound) {
                    return firstMappedFile;
                }
            }
        } catch (Exception e) {
            log.error("findMappedFileByOffset Exception", e);
        }
    
        return null;
    }
    

    获取存储文件最小偏移量

    public long getMinOffset() {
    
        if (!this.mappedFiles.isEmpty()) {
            try {
                return this.mappedFiles.get(0).getFileFromOffset();
            } catch (IndexOutOfBoundsException e) {
                //continue;
            } catch (Exception e) {
                log.error("getMinOffset has exception.", e);
            }
        }
        return -1;
    }
    

    获取存储文件最大偏移量

    public long getMaxOffset() {
        //最后一个映射文件
        MappedFile mappedFile = getLastMappedFile();
        if (mappedFile != null) {
            return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
        }
        return 0;
    }
    

    获取存储文件当前写指针

    public long getMaxWrotePosition() {
        MappedFile mappedFile = getLastMappedFile();
        if (mappedFile != null) {
            return mappedFile.getFileFromOffset() + mappedFile.getWrotePosition();
        }
        return 0;
    }
    

    5.2.MappedFile

    //操作系统每页刷写大小,默认4K
    public static final int OS_PAGE_SIZE = 1024 * 4;
    //当前JVM实例中MappedFile虚拟内存
    private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);
    //当前JVM实例中MappedFile对象个数
    private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
    //当前文件的写指针
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    //当前文件的提交指针
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    //刷盘指针
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    //文件大小
    protected int fileSize;
    //文件通道
    protected FileChannel fileChannel;
    
    /**
     * 堆外内存ByteBuffer
     * Message will put to here first, and then reput to FileChannel if writeBuffer is not null.
     */
    protected ByteBuffer writeBuffer = null;
    //堆外内存池
    protected TransientStorePool transientStorePool = null;
    //文件名称
    private String fileName;
    //该文件的处理偏移量
    private long fileFromOffset;
    //物理文件
    private File file;
    //文件映射缓冲区
    private MappedByteBuffer mappedByteBuffer;
    //存储时间戳
    private volatile long storeTimestamp = 0;
    //是否是初次创建
    private boolean firstCreateInQueue = false;
    

    MappedFile初始化

    private void init(final String fileName, final int fileSize) throws IOException {
        this.fileName = fileName;
        this.fileSize = fileSize;
        this.file = new File(fileName);
        this.fileFromOffset = Long.parseLong(this.file.getName());
        boolean ok = false;
        //确保文件目录正确
        ensureDirOK(this.file.getParent());
    
        try {
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {
            log.error("Failed to create file " + this.fileName, e);
            throw e;
        } catch (IOException e) {
            log.error("Failed to map file " + this.fileName, e);
            throw e;
        } finally {
            if (!ok && this.fileChannel != null) {
                this.fileChannel.close();
            }
        }
    }
    

    值得注意的是MappedFile还有一个属性值transientStorePoolEnable,当这个属性值为true时,数据会先存储到对外内存,如何通过commit线程将数据提交到内存映射buffer中,最后通过flush线程将内存映射刷写到磁盘中。

    开启transientStorePoolEnable

    public void init(final String fileName, final int fileSize,
        final TransientStorePool transientStorePool) throws IOException {
        init(fileName, fileSize);
        //初始化对外内存缓冲区
        this.writeBuffer = transientStorePool.borrowBuffer();
        this.transientStorePool = transientStorePool;
    }
    

    5.2.1.commit

    刷盘文件提交流程大致如下:

    DefaultMessageStore#flush→CommitLog→MappedFileQueue→MappedFile

    //DefaultMessageStore
    public long flush() {
        return this.commitLog.flush();
    }
    //CommitLog
    public long flush() {
        //----------↓-----------
        this.mappedFileQueue.commit(0);
        this.mappedFileQueue.flush(0);
        return this.mappedFileQueue.getFlushedWhere();
    }
    //MappedFileQueue
    public boolean commit(final int commitLeastPages) {
        boolean result = true;
        MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
        if (mappedFile != null) {
            //----------↓-----------
            int offset = mappedFile.commit(commitLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.committedWhere;
            this.committedWhere = where;
        }
        return result;
    }
    

    最后进入MappedFile进行数据刷写提交:

    MappedFile#commit

    public int commit(final int commitLeastPages) {
        //如果为空->说明没有开启transientStorePoolEnable->无需向文件通道fileChannel提交数据 
        //将wrotePosition视为committedPosition并返回->然后直接进行flush操作
        if (writeBuffer == null) {
            return this.wrotePosition.get();
        }
        //提交数据页数大于commitLeastPages
        if (this.isAbleToCommit(commitLeastPages)) {
            //MappedFile是否被销毁
            //hold()->isAvailable()->MappedFile.available<属性继承于ReferenceResource>
            //文件如何被摧毁可见下文中的shutdown()
            if (this.hold()) {
                //--↓--
                commit0();
                this.release();
            } else {
                log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
            }
        }
    
        // All dirty data has been committed to FileChannel.
        // 所有数据提交后,清空缓冲区
        if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
            this.transientStorePool.returnBuffer(writeBuffer);
            this.writeBuffer = null;
        }
        return this.committedPosition.get();
    }
    

    MappedFile#isAbleToCommit

    //已提交刷盘的指针
    int flush = this.committedPosition.get();
    //文件写指针
    int write = this.wrotePosition.get();
    
    //刷盘已写满
    if (this.isFull()) {
        return true;
    }
    
    if (commitLeastPages > 0) {
        //文件内容达到commitLeastPages->进行刷盘
        return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
    }
    return write > flush;
    

    MappedFile#commit0

    //写指针
    int writePos = this.wrotePosition.get();
    //上次提交指针
    int lastCommittedPosition = this.committedPosition.get();
    //写指针一定要大于上次提交指针
    if (writePos - lastCommittedPosition > 0) {
        try {
            //复制共享内存区域
            ByteBuffer byteBuffer = writeBuffer.slice();
            //设置提交位置是上次提交位置
            byteBuffer.position(lastCommittedPosition);
            //最大提交数量
            byteBuffer.limit(writePos);
            //设置fileChannel位置是上次提交位置
            this.fileChannel.position(lastCommittedPosition);
            //将lastCommittedPosition到writePos的数据复制到FileChannel中
            this.fileChannel.write(byteBuffer);
            //重置提交位置为writePos->以此反复避免提交重复数据
            this.committedPosition.set(writePos);
        } catch (Throwable e) {
            log.error("Error occurred when commit data to FileChannel.", e);
        }
    }
    

    5.2.2.flush

    刷写磁盘,直接调用MappedByteBuffer或fileChannel的force方法将内存中的数据持久化到磁盘,那么flushedPosition应该等于MappedByteBuffer中的写指针;

    • 如果writeBuffer不为空,则flushPosition应该等于上一次的commit指针;因为上一次提交的数据就是进入到MappedByteBuffer中的数据;
    • 如果writeBuffer为空,数据时直接进入到MappedByteBuffer,wrotePosition代表的是MappedByteBuffer中的指针,故设置flushPosition为wrotePosition。


    提交数据到fileChannel后开始刷盘,步骤如下:

    CommitLog#flush→MappedFileQueue#flush→MappedFile#flush

    MappedFile#flush

    //达到刷盘条件
    if (this.isAbleToFlush(flushLeastPages)) {
        //加锁,同步刷盘
        if (this.hold()) {
            //读指针
            int value = getReadPosition();
    
            try {
                //开启TransientStorePool->fileChannel
                //关闭TransientStorePool->mappedByteBuffer
                //We only append data to fileChannel or mappedByteBuffer, never both.
                //数据从writeBuffer提交数据到fileChannel->force
                if (writeBuffer != null || this.fileChannel.position() != 0) {
                    this.fileChannel.force(false);
                }
                //数据直接传到mappedByteBuffer->force
                else {
                    this.mappedByteBuffer.force();
                }
            } catch (Throwable e) {
                log.error("Error occurred when force data to disk.", e);
            }
    
            //更新刷盘位置
            this.flushedPosition.set(value);
            this.release();
        } else {
            log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
            this.flushedPosition.set(getReadPosition());
        }
    }
    return this.getFlushedPosition();
    

    MappedFile#getReadPosition

    /**
     * 获取当前文件最大可读指针
     * @return The max position which have valid data
     */
    public int getReadPosition() {
        //如果writeBuffer为空直接返回当前的写指针,否则返回上次提交的指针
        //在MappedFile中,只有提交了的数据(写入到MappedByteBuffer或FileChannel中的数据)才是安全的数据
        return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
    }
    

    MappedFile#shutdown

    MappedFile文件销毁的实现方法为ReferenceResource中的public boolean destory(long intervalForcibly),intervalForcibly表示拒绝被销毁的最大存活时间。

    if (this.available) {
        //关闭MappedFile
        this.available = false;
        //设置关闭时间戳
        this.firstShutdownTimestamp = System.currentTimeMillis();
        //释放资源
        this.release();
    } else if (this.getRefCount() > 0) {
        if ((System.currentTimeMillis() - this.firstShutdownTimestamp) >= intervalForcibly) {
            this.refCount.set(-1000 - this.getRefCount());
            this.release();
        }
    }
    

    5.3.TransientStorePool

    用于短时间存储数据的存储池。RocketMQ单独创建ByteBuffer内存缓冲区,用来临时存储数据,数据先写入该内存映射,然后由commit线程将数据复制到目标物理文件所对应的内存映射中。RocketMQ引入该机制主要的原因是提供一种内存锁定,将当前堆外内存一直锁定在内存中,避免被进程将内存交换到磁盘。

    private final int poolSize;      //availableBuffers个数
    private final int fileSize;     //每个ByteBuffer大小
    private final Deque<ByteBuffer> availableBuffers;   //双端队列-存储可用缓冲区的容器
    private final MessageStoreConfig storeConfig;       //消息存储配置
    

    初始化:

    public void init() {
        //创建poolSize个堆外内存区
        for (int i = 0; i < poolSize; i++) {
            //分配内存
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
            //内存地址
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            //使用com.sun.jna.Library类库将该批内存锁定,避免被置换到交换区,提高存储性能
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
    
            availableBuffers.offer(byteBuffer);
        }
    }
    

    6.刷盘机制

    6.1.同步刷盘

    CommitLog#submitFlushRequest

    //同步刷盘
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        //刷写CommitLog服务线程
        final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
        //需要等待消息存储结果
        if (messageExt.isWaitStoreMsgOK()) {
            //封装刷盘请求
            GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                    this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
            //将request放入刷写磁盘服务线程中
            //--------↓--------
            service.putRequest(request);
            //等待写入结果返回
            return request.future();
        } else {
            //唤醒同步刷盘线程
            service.wakeup();
            return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
        }
    }
    else {
        //异步刷盘....
    }
    

    GroupCommitRequest

    public static class GroupCommitRequest {
        private final long nextOffset;
        private CompletableFuture<PutMessageStatus> flushOKFuture = new CompletableFuture<>();
        private final long startTimestamp = System.currentTimeMillis();
        private long timeoutMillis = Long.MAX_VALUE;
    }    
    

    GroupCommitService

    class GroupCommitService extends FlushCommitLogService {
        //分别存储写请求和读请求的容器
        private volatile LinkedList<GroupCommitRequest> requestsWrite = new LinkedList<GroupCommitRequest>();
        private volatile LinkedList<GroupCommitRequest> requestsRead = new LinkedList<GroupCommitRequest>();
        //消息存储自旋锁-保护以上容器线程安全
        private final PutMessageSpinLock lock = new PutMessageSpinLock();
    }    
    

    GroupCommitService#putRequest

    //加上自旋锁
    lock.lock();
    try {
        //将写请求放入容器
        this.requestsWrite.add(request);
    } finally {
        lock.unlock();
    }
    //唤醒线程
    this.wakeup();
    

    GroupCommitService#run

    CommitLog.log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            //等待线程10s
            this.waitForRunning(10);
            //执行提交任务
            this.doCommit();
        } catch (Exception e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
        }
    }
    // Under normal circumstances shutdown, wait for the arrival of the
    // request, and then flush
    try {
        Thread.sleep(10);
    } catch (InterruptedException e) {
        CommitLog.log.warn("GroupCommitService Exception, ", e);
    }
    synchronized (this) {
        this.swapRequests();
    }
    this.doCommit();
    CommitLog.log.info(this.getServiceName() + " service end");
    

    GroupCommitService#doCommit

    if (!this.requestsRead.isEmpty()) {
        //遍历requestsRead
        for (GroupCommitRequest req : this.requestsRead) {
            
            //刷盘后指针位置大于请求指针偏移量则代表已经刷盘成功
            //下一个文件中可能有消息,所以最多两次flush
            boolean flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
            for (int i = 0; i < 2 && !flushOK; i++) {
                CommitLog.this.mappedFileQueue.flush(0);
                flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
            }
            //唤醒发送消息客户端 
            req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
        }
        //更新刷盘监测点
        long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
        if (storeTimestamp > 0) {
            CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
        }
        //清空任务容器
        this.requestsRead = new LinkedList<>();
    } else {
        //因为个别消息设置为异步flush,所以会走到这个过程
        CommitLog.this.mappedFileQueue.flush(0);
    }
    

    6.2.异步刷盘

    在消息追加到内存后,立即返回给消息发送端。如果开启transientStorePoolEnable,RocketMQ会单独申请一个与目标物理文件(commitLog)同样大小的堆外内存,该堆外内存将使用内存锁定,确保不会被置换到虚拟内存中去,消息首先追加到堆外内存,然后提交到物理文件的内存映射中,然后刷写到磁盘。如果未开启transientStorePoolEnable,消息直接追加到物理文件直接映射文件中,然后刷写到磁盘中。

    开启transientStorePoolEnable后异步刷盘步骤:

    • 将消息直接追加到ByteBuffer堆外内存
    • CommitRealTimeService线程每隔200ms将ByteBuffer中的消息提交到fileChannel
    • commit操作成功,将commitedPosition向后移动
    • FlushRealTimeService线程每隔500ms将fileChannel的数据刷写到磁盘
    // Synchronization flush
    if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
        ...
    }
    // Asynchronous flush
    else {
        //开启TransientStorePoolEnable
        if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            //唤醒flushCommitLogService服务线程
            flushCommitLogService.wakeup();
        } else  {
            commitLogService.wakeup();
        }
        return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
    }
    

    CommitRealTimeService#run

    提交线程工作机制:

        //间隔时间:200ms
        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
        //一次提交的最少页数:4
        int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
        //两次提交的最大间隔:200ms
        int commitDataThoroughInterval =
            CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
    
        //上次提交间隔超过commitDataThoroughInterval,则忽略提交commitDataLeastPages参数,直接提交
        long begin = System.currentTimeMillis();
        if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
            this.lastCommitTimestamp = begin;
            //忽略提交页数要求
            commitDataLeastPages = 0;
        }
    
        try {
            //执行提交操作,将待提交数据提交到物理文件的内存映射区并返回提交结果
            boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
            long end = System.currentTimeMillis();
            //提交成功
            if (!result) {
                this.lastCommitTimestamp = end; // result = false means some data committed.
                //now wake up flush thread.
                //唤醒刷盘线程FlushRealTimeService(FlushCommitLogService的子类)
                flushCommitLogService.wakeup();
            }
    
            if (end - begin > 500) {
                log.info("Commit data to file costs {} ms", end - begin);
            }
            this.waitForRunning(interval);
        } catch (Throwable e) {
            CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
        }
    }
    

    FlushCommitLogService#run

    刷盘线程工作机制:

    //线程不停止
    while (!this.isStopped()) {
        //线程执行间隔:500ms
        int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
        //一次刷盘任务最少包含页数:4
        int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
    
        //两次刷盘任务最大间隔:10s
        int flushPhysicQueueThoroughInterval =
            CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
    
        boolean printFlushProgress = false;
    
        // Print flush progress
        long currentTimeMillis = System.currentTimeMillis();
        //如果当前时间戳大于上次刷盘时间+最大刷盘任务间隔 则本次刷盘任务忽略flushPhysicQueueLeastPages(设置为0) 直接提交刷盘任务
        if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
            this.lastFlushTimestamp = currentTimeMillis;
            flushPhysicQueueLeastPages = 0;
            printFlushProgress = (printTimes++ % 10) == 0;
        }
    
        try {
            if (flushCommitLogTimed) {
                //线程执行间隔-500m
                Thread.sleep(interval);
            } else {
                this.waitForRunning(interval);
            }
    
            if (printFlushProgress) {
                this.printFlushProgress();
            }
    
            long begin = System.currentTimeMillis();
            //刷写磁盘
            CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
            //更新存储监测点文件的时间戳
            long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
            if (storeTimestamp > 0) {
                CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
            }
            long past = System.currentTimeMillis() - begin;
            if (past > 500) {
                log.info("Flush data to disk costs {} ms", past);
            }
        } catch (Throwable e) {
            CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            this.printFlushProgress();
        }
    }
    

    本文仅作为个人学习使用,如有不足或错误请指正!

    相关文章

      网友评论

          本文标题:RocketMQ:消息存储机制详解与源码解析

          本文链接:https://www.haomeiwen.com/subject/vuiltrtx.html