美文网首页消息中间件
RocketMQ源码-MappedFile介绍

RocketMQ源码-MappedFile介绍

作者: persisting_ | 来源:发表于2019-07-21 10:00 被阅读5次


    1 概述
    2 MappedFileQueueMappedFile类介绍
    3 MappedFile的获取
    4 TransientStorePool暂存池
    5 MappedFile预分配
    6 MappedFile写入
    7 MappedFile刷盘
    8 注意

    1 概述

    RocketMQ的存储都基于MappedFile实现,如CommitLogIndex索引、ConsumeQueue等,本文则主要介绍的实现机制,包括MappedFileQueue类介绍、MappedFile类介绍、预热、MappedFile预分配服务AllocateMappedFileServiceMappedFile刷盘等内容。

    2 MappedFileQueueMappedFile类介绍

    • MappedFileQueue

    CommitLog消息存储、ConsumeQueue等通常会记录大量的数据,一个MappedFile具有固定大小(默认1G),所以一个MappedFile不能记录所有的内容,于是CommitLogConsumeQueue通常会使用多个MappedFile记录数据,RocketMQ则使用MappedFileQueue组织一系列的MappedFile,处在MappedFileQueue队尾的通常是刚写满的或者还有部分空间或者刚分配的MappedFile,每次写操作时,都会从队尾拿到最后一个MappedFile进行写。

    • MappedFile

    首先看一下MappedFile类的主要变量

    //常量,内存页大小,4KB
    public static final int OS_PAGE_SIZE = 1024 * 4;
    //上次写的位置
    protected final AtomicInteger wrotePosition = new AtomicInteger(0);
    //已经提交的位置
    protected final AtomicInteger committedPosition = new AtomicInteger(0);
    //已经刷盘的位置
    private final AtomicInteger flushedPosition = new AtomicInteger(0);
    //文件大小
    protected int fileSize;
    //该MappedFile文件对应的channel
    protected FileChannel fileChannel;
    /**
    * Message will put to here first, and then reput to 
    * FileChannel if writeBuffer is not null.
    */
    //如果启用了TransientStorePool,则writeBuffer为从暂时存储池中借用
    //的buffer,此时存储对象(比如消息等)会先写入该writeBuffer,然后
    //commit到fileChannel,最后对fileChannel进行flush刷盘
    protected ByteBuffer writeBuffer = null;
    //一个内存ByteBuffer池实现,如果如果启用了TransientStorePool则不为空
    protected TransientStorePool transientStorePool = null;
    //文件名,其实就是该文件内容默认其实位置
    private String fileName;
    //该文件中内容相对于整个文件的偏移,其实和文件名相同
    private long fileFromOffset;
    //该MappedFile对应的实际文件
    private File file;
    //通过fileChannel.map得到的可读写的内存映射buffer,如果没有启用
    //TransientStorePool则写数据时会写到该缓冲中,刷盘时直接调用该
    //映射buffer的force函数,而不需要进行commit操作
    private MappedByteBuffer mappedByteBuffer;
    

    3 MappedFile的获取

    因为每次写数据时,都会从MappedFileQueue中获取最后一个MappedFile,如果MappedFileQueue为空,或者最后一个MappedFile已经写满,则会重新分配一个新的MappedFile,我们以CommitLog对生产者发来的消息进行存储为例,看其是如何获取MappedFile进行写操作的,

    //CommitLog
    public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
        ...
        //首先获取MappedFileQueue中的最后一个MappedFile类实例
        MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
    
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try {
            ...
            //如果上面返回空,则表示queue为空,还没有新建过MappedFile对象
            //如果上面返回不为空,但是返回的MappedFile已经写满
            //此时,需要新建一个新的MappedFile
            if (null == mappedFile || mappedFile.isFull()) {
                //getLastMappedFile(0)会创建一个新的MappedFile并返回
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            }
            if (null == mappedFile) {
                ...
            }
    
            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            ...
        }
            ...
    }
    
    

    MappedFileQueue具体创建过程如下:

    //MappedFileQueue
    //传入的startOffset默认为0,在具体创建时会进行判断
    public MappedFile getLastMappedFile(final long startOffset) {
        return getLastMappedFile(startOffset, true);
    }
    
    public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {
        //需要创建的MappedFile偏移(大小)
        long createOffset = -1;
        //获取该queue中的最后一个MappedFile,用于计算需要新建的MappedFile
        //的其实位置(偏移)
        MappedFile mappedFileLast = getLastMappedFile();
        //如果为空的话,表示目前新建的MappedFile为此queue中的第一个
        //MappedFile,createOffset为startOffset,因为createOffset
        //需要为定义的mappedFileSize(默认1G),所以这里减去
        //多余的部分(startOffset % this.mappedFileSize)
        if (mappedFileLast == null) {
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        }
        //如果queue中在创建MappedFile时已经有存量的MappedFile,则此次
        //创建是由于queue最后一个MappedFile已经写满触发的,所以此次创建
        //的新MappedFile偏移则为最后一个文件偏移+mappedFileSize
        if (mappedFileLast != null && mappedFileLast.isFull()) {
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        }
    
        if (createOffset != -1 && needCreate) {
            //MappedFile的创建可以进行预创建,在创建一个MappedFile时,会
            //同时创建下一个MappedFile,这样下次需要新建MappedFile时则
            //可以直接拿到已经预创建好的MappedFile,提高了写的性能。
    
            //此次新建的MappedFile对应的文件名
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            //预分配的下一个MappedFile对应的文件名
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
            MappedFile mappedFile = null;
            //allocateMappedFileService不为null,表示启用了预创建功能
            //此时会同时向allocateMappedFileService提交两个创建任务,
            //一个为此次需要创建的MappedFile,一个为下次获取MappedFile
            //预创建的MappedFile,此次需要创建的MappedFile的创建任务
            //为同步创建,需要等其创建成功返回,而预创建的MappedFile则
            //为异步创建
            if (this.allocateMappedFileService != null) {
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                //如果没有启用预创建功能,则直接new一个MappedFile对象进行
                //创建,没有启用预创建功能的MappedFile创建都是不会使用
                //TransientStorePool暂存池优化的
                try {
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }
    
            if (mappedFile != null) {
                if (this.mappedFiles.isEmpty()) {
                    mappedFile.setFirstCreateInQueue(true);
                }
                this.mappedFiles.add(mappedFile);
            }
    
            return mappedFile;
        }
    
        return mappedFileLast;
    }
    

    上面创建MappedFile有两种形式,一种是通过其构造函数直接创建,一种是提交给allocateMappedFileService创建并异步创建下一个MappedFile待用。

    我们首先看第一种形式,即直接new MappedFile,此时不会使用
    TransientStorePool暂存池优化的,我们首先看MappedFile构造函数:

    //MappedFile
    public MappedFile(final String fileName, final int fileSize) throws IOException {
        init(fileName, fileSize);
    }
    
    
    private void init(final String fileName, final int fileSize) throws IOException {
        //记录文件名、大小等信息
        this.fileName = fileName;
        this.fileSize = fileSize;
        //新建File对象
        this.file = new File(fileName);
        this.fileFromOffset = Long.parseLong(this.file.getName());
        boolean ok = false;
    
        //保证路径都存在
        ensureDirOK(this.file.getParent());
    
        try {
            //获取fileChannel对象
            this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
            //获取内存映射缓冲
            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
            //统计计数器更新
            TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
            TOTAL_MAPPED_FILES.incrementAndGet();
            ok = true;
        } catch (FileNotFoundException e) {
            log.error("create file channel " + this.fileName + " Failed. ", e);
            throw e;
        } catch (IOException e) {
            log.error("map file " + this.fileName + " Failed. ", e);
            throw e;
        } finally {
            if (!ok && this.fileChannel != null) {
                this.fileChannel.close();
            }
        }
    }
    
    

    因为MappedFile预分配allocateMappedFileService涉及到了TransientStorePool暂存池,所以下面一节先简单介绍TransientStorePool暂存池。

    4 TransientStorePool暂存池

    TransientStorePool类比较简单,采用双端队列Deque维护了一些列的预分配的ByteBuffer,这些ByteBuffer都是在堆外分配的直接内存,DefaultMessageStore会持有TransientStorePool对象实例,如果启动时配置了启用transientStorePoolEnable,那么在DefaultMessageStore构造函数中会调用TransientStorePool.init方法,预分配ByteBuffer并放入队列中,如果启动时没有启用TransientStorePool功能,则不会调用TransientStorePool.init方法,那么从队列中获取ByteBuffer会返回null。

    TransientStorePool主要域如下:

    //TransientStorePool
    //池中预分配ByteBuffer数量
    private final int poolSize;
    //每个ByteBuffer大小
    private final int fileSize;
    //采用双端队列维护预分配的ByteBuffer
    private final Deque<ByteBuffer> availableBuffers;
    

    TransientStorePool主要的方法如下:

    //TransientStorePool
    //如源码注释,因为这里需要申请多个堆外ByteBuffer,所以是个
    //十分heavy的初始化方法
    /**
    * It's a heavy init method.
    */
    public void init() {
        //申请poolSize个ByteBuffer
        for (int i = 0; i < poolSize; i++) {
            //申请直接内存空间
            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(fileSize);
    
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            //锁住内存,避免操作系统虚拟内存的换入换出
            LibC.INSTANCE.mlock(pointer, new NativeLong(fileSize));
            //将预分配的ByteBuffer方法队列中
            availableBuffers.offer(byteBuffer);
        }
    }
    
    //销毁内存池
    public void destroy() {
        //取消对内存的锁定
        for (ByteBuffer byteBuffer : availableBuffers) {
            final long address = ((DirectBuffer) byteBuffer).address();
            Pointer pointer = new Pointer(address);
            LibC.INSTANCE.munlock(pointer, new NativeLong(fileSize));
        }
    }
    
    //使用完毕之后归还ByteBuffer
    public void returnBuffer(ByteBuffer byteBuffer) {
        //ByteBuffer各下标复位
        byteBuffer.position(0);
        byteBuffer.limit(fileSize);
        //放入队头,等待下次重新被分配
        this.availableBuffers.offerFirst(byteBuffer);
    }
    
    //从池中获取ByteBuffer
    public ByteBuffer borrowBuffer() {
        //非阻塞弹出队头元素,如果没有启用暂存池,则
        //不会调用init方法,队列中就没有元素,这里返回null
        //其次,如果队列中所有元素都被借用出去,队列也为空
        //此时也会返回null
        ByteBuffer buffer = availableBuffers.pollFirst();
        //如果队列中剩余元素数量小于配置个数的0.4,则写日志提示
        if (availableBuffers.size() < poolSize * 0.4) {
            log.warn("TransientStorePool only remain {} sheets.", availableBuffers.size());
        }
        return buffer;
    }
    
    //剩下可借出的ByteBuffer数量
    public int remainBufferNumbs() {
        //如果启用了暂存池,则返回队列中元素个数
        if (storeConfig.isTransientStorePoolEnable()) {
            return availableBuffers.size();
        }
        //否则返会Integer.MAX_VALUE
        return Integer.MAX_VALUE;
    }
    

    如果使用的暂存池TransientStorePool,那么创建MappedFile使用的构造函数为:

    //MappedFile
    public MappedFile(final String fileName, final int fileSize,
        final TransientStorePool transientStorePool) throws IOException {
        init(fileName, fileSize, transientStorePool);
    }
    
    public void init(final String fileName, final int fileSize,
        final TransientStorePool transientStorePool) throws IOException {
        //这个重载的init方法上面已经介绍过
        init(fileName, fileSize);
        //不同就是这里的writeBuffer会被赋值,后续写入操作会优先
        //写入writeBuffer中
        this.writeBuffer = transientStorePool.borrowBuffer();
        //记录transientStorePool主要为了释放时归还借用的ByteBuffer
        this.transientStorePool = transientStorePool;
    }
    

    这里还要注意一下,在预分配AllocateMappedFileService服务中,会先尝试使用ServiceLoader扩展点加载用户自定义的MappedFile实现,此时构造函数使用的是两个参数的构造函数,但是会显示调用具有TransientStorePool参数的init方法进行初始化。在线面介绍AllocateMappedFileService.mmapOperation方法时会看到ServiceLoader扩展点的使用。

    5 MappedFile预分配

    在介绍MappedFile获取时说到,如果启用了MappedFile预分配服务,那么在创建MappedFile时会同时创建两个MappedFile,一个同步创建并返回用于本次实际使用,一个后台异步创建用于下次取用,在MappedFileQueue.getLastMappedFile中预分配的入口为this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);

    上面参数中nextFilePath为本次请求的MappedFile文件名和大小,而nextNextFilePath则为为下一次提前分配的MappedFile文件名和大小。

    这里先说明一下为什么预分配只在本次实际所需创建的MappedFile的基础上提前分配一个MappedFile,个人理解时分配的MappedFile默认大小1G,空间较大,且分配之后一般能够撑一段时间的写入,不用也不能预分配过多的MappedFile

    好了,现在我们看allocateMappedFileService.putRequestAndReturnMappedFile方法实现:

    //AllocateMappedFileService
    public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
        //每次最多只能提交两个分配请求
        int canSubmitRequests = 2;
        if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
            if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
                //因为预分配使用了暂存池,所以这里重新计算可提交请求书为
                //暂存池剩余ByteBuffer数量-已提交的预分配请求个数
                canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();
            }
        }
    
        //创建分配任务,这个任务表示的是当前实际需要的且要等待期返回的
        //MappedFile分配请求
        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
        //AllocateRequest自定义了equal/hashCode方法,这里不再列出其源码
        //如果该filePath和fileSize已经在requestTable中,则表示
        //此次所需分配的MappedFile已经在上次分配时被预分配了
        //同时要注意的是AllocateRequest也实现了Comparable接口
        //放入优先队列中可自动排序,文件偏移小的会先被分配
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
        //如果nextPutOK为true,表示该分片请求被成功放入requestTable中,
        //也就表示该请求原先没有预分配过
        if (nextPutOK) {
            //此时检查可提交请求个数,如果小于等于0,则表示不可提交
            //预分配失败,返回null,通过上面介绍过的使用new MappedFile
            //创建MappedFile
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
                //移除刚放入requestTable中的请求
                this.requestTable.remove(nextFilePath);
                return null;
            }
            //如果可提交请求个数符合要求,则将分配请求加入队列中排序
            boolean offerOK = this.requestQueue.offer(nextReq);
            if (!offerOK) {
                log.warn("never expected here, add a request to preallocate queue failed");
            }
            //已经提交了一个请求,所以可提交请求数减一
            canSubmitRequests--;
        }
    
        //预分配此次所需的下一个MappedFile
        //过程和上面类似,不再介绍
        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
        if (nextNextPutOK) {
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
                this.requestTable.remove(nextNextFilePath);
            } else {
                boolean offerOK = this.requestQueue.offer(nextNextReq);
                if (!offerOK) {
                    log.warn("never expected here, add a request to preallocate queue failed");
                }
            }
        }
    
        if (hasException) {
            log.warn(this.getServiceName() + " service has exception. so return null");
            return null;
        }
        //获取刚提交的或者以前预分配的此次实际需要创建的MappedFile的创建
        //请求对象
        AllocateRequest result = this.requestTable.get(nextFilePath);
        try {
            if (result != null) {
                //AllocateRequest使用CountDownLatch进行阻塞
                boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
                if (!waitOK) {
                    log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                    return null;
                } else {
                    //如果没有超时,MappedFile创建完成,则返回创建的
                    //MappedFile
                    this.requestTable.remove(nextFilePath);
                    return result.getMappedFile();
                }
            } else {
                log.error("find preallocate mmap failed, this never happen");
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }
    
        return null;
    }
    

    预分配请求提交的方法已经介绍完毕,此时提交预分配请求的方法被阻塞等待获取当前所需创建MappedFile任务,那么提交的分配任务时如何处理的呢?其实AllocateMappedFileService类扩展自ServiceThread,是一个线程,其run方法定义如下:

    //AllocateMappedFileService
    public void run() {
        log.info(this.getServiceName() + " service started");
        //调用mmapOperation进行任务处理
        while (!this.isStopped() && this.mmapOperation()) {
    
        }
        log.info(this.getServiceName() + " service end");
    }
    
    //方法注释说明,只有被其他线程中断会返回false,其他情况都会返回true
    /**
    * Only interrupted by the external thread, will return false
    */
    private boolean mmapOperation() {
        boolean isSuccess = false;
        AllocateRequest req = null;
        try {
            //从队列获取创建请求,这里的队列为优先队列,里面的
            //AllocateRequest分配请求已经根据文件偏移进行过排序
            req = this.requestQueue.take();
            //同时从requestTable获取创建请求,如果获取失败,表示已经超时
            //被移除,此时记录日志,此任务已经不需要处理
            AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
            if (null == expectedRequest) {
                log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize());
                return true;
            }
            if (expectedRequest != req) {
                log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
                return true;
            }
            //如果取得的分配任务尚没有持有MappedFile对象,则进行分配
            if (req.getMappedFile() == null) {
                long beginTime = System.currentTimeMillis();
                //如果启用了暂存池TransientStorePool,则进行存储池分配
                MappedFile mappedFile;
                if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                    try {
                        //这里有个ServiceLoader扩展,用户可通过
                        //ServiceLoader使用自定义的MappedFile实现
                        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    } catch (RuntimeException e) {
                        //如果用户没有通过ServiceLoader使用
                        //自定义的MappedFile,则使用RocketMQ
                        //默认的MappedFile实现
                        log.warn("Use default implementation.");
                        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    }
                } else {
                    //如果没有启用暂存池,则直接使用new MappedFile创建
                    //没有暂存池的初始化
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                }
    
                long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
                if (eclipseTime > 10) {
                    int queueSize = this.requestQueue.size();
                    log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
                        + " " + req.getFilePath() + " " + req.getFileSize());
                }
    
                //如果配置了MappedFile预热,则进行MappedFile预热
                // pre write mappedFile
                if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                    .getMapedFileSizeCommitLog()
                    &&
                    this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
                    //具体的预热方法不具体介绍,简单的说就是通过加载分配
                    //空间的每个内存页进行写入,使分配的ByteBuffer
                    //加载到内存中,并和暂存池一样,避免其被操作系统
                    //换出
                    mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                        this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
                }
                //分配完成的MappedFile放入请求中
                req.setMappedFile(mappedFile);
                this.hasException = false;
                isSuccess = true;
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
            this.hasException = true;
            return false;
        } catch (IOException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.hasException = true;
            //发生异常则将请求重新放回队列,下次重新尝试分配
            if (null != req) {
                requestQueue.offer(req);
                try {
                    Thread.sleep(1);
                } catch (InterruptedException ignored) {
                }
            }
        } finally {
            if (req != null && isSuccess)
            //创建完毕,则让阻塞的获取方法返回
            req.getCountDownLatch().countDown();
        }
        return true;
    }
    

    6 MappedFile写入

    如何分配得到MappedFile已经介绍完毕,下面我们以BrokerController存储消息为例介绍MappedFile的写入操作,BrokerController的消息存储最终调用MappedFile.appendMessage完成写入:

    //MappedFile
    public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
        return appendMessagesInner(msg, cb);
    }
    
    
    public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;
    
        int currentPos = this.wrotePosition.get();
    
        if (currentPos < this.fileSize) {
            //我们重点看这里,如果writeBuffer不为空,则优先
            //写入writeBuffer,否则写入mappedByteBuffer,
            //通过前面的介绍可以知道,如果启用了暂存池
            //TransientStorePool则writeBuffer会被初始化
            //否则writeBuffer为空
            //slice方法返回一个新的byteBuffer,但是这里新的
            //byteBuffer和原先的ByteBuffer共用一个存储空间
            //只是自己维护的相关下标
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            //下面的写入我们不展开介绍,无非对消息进行编码
            //然后将编码后的数据写入这里得到的byteBuffer等待刷盘
            byteBuffer.position(currentPos);
            AppendMessageResult result = null;
            if (messageExt instanceof MessageExtBrokerInner) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
    

    7 MappedFile刷盘

    MappedFile刷盘操作根据具体配置分为同步和异步刷盘两种方式,这里不管同步异步,其操作类似,都是通过MappedFile.commitMappedFile.flush,如果启用了暂存池TransientStorePool则会先调用MappedFile.commitwriteBuffer中的数据写入fileChannel中,然后再调用MappedFile.flush;而MappedFile.flush通过fileChannel.force或者mappedByteBuffer.force()进行实际的刷盘动作,具体方法不再展开介绍

    8 注意

    是否启用暂存池TransientStorePool并不是单独配置transientStorePoolEnable为true就可以了,我们可以看下MessageStoreConfig.isTransientStorePoolEnable方法的实现:

    //Enable transient commitLog store pool only if transientStorePoolEnable 
    //is true and the FlushDiskType is ASYNC_FLUSH
    public boolean isTransientStorePoolEnable() {
     return transientStorePoolEnable && FlushDiskType.ASYNC_FLUSH == getFlushDiskType()
     && BrokerRole.SLAVE != getBrokerRole();
    }
    

    只有主Broker、刷盘方式为异步刷盘且transientStorePoolEnable为true才会启用暂存池TransientStorePool

    相关文章

      网友评论

        本文标题:RocketMQ源码-MappedFile介绍

        本文链接:https://www.haomeiwen.com/subject/ylpplctx.html