美文网首页
store模块阅读18:CommitLog(2): FlushC

store模块阅读18:CommitLog(2): FlushC

作者: 赤子心_d709 | 来源:发表于2017-11-08 10:15 被阅读128次

    说明

    这里讲解CommitLog内部类FlushCommitLogService以及其实现类CommitRealTimeService,FlushRealTimeService,GroupCommitService,用于处理刷盘行为
    三者针对不同的场景,单独工作或者配合工作

    相关上文为:
    调用方在putMessage函数中
    1.调用了result = mappedFile.appendMessage(msg, this.appendMessageCallback);更新了mappedFile对应的wrotePosition
    2.后续再调用handleDiskFlush的,准备进行刷盘的

    下面说明三者的区别
    三者是与mappedFile的wrotePosition,commitPosition,FlushPosition相关的
    参照refer更方便理解三种模式与mappedFile三个属性的关系

    -- 同步 异步,关闭TransientStorePool 异步,开启TransientStorePool
    类名 GroupCommitService FlushRealTimeService FlushRealTimeService+ CommitRealTimeService
    底层mappedFile调用 flush flush CommitRealTimeService先调用mappedFile.commit, FlushRealTimeService再调用 mappedFile.flush

    画个图更好理解


    三种刷盘方式与mappedFile的关系

    UML图如下

    image.png

    ServiceThread之后讲

    处理刷盘
    详见 思考 部分的理解

    FlushCommitLogService

    抽象父类

        abstract class FlushCommitLogService extends ServiceThread {
            protected static final int RETRY_TIMES_OVER = 10;
        }
    

    同步刷盘

    涉及GroupCommitRequest 以及 GroupCommitService

    GroupCommitRequest

    这个是同步刷盘请求 ,源码如下

        public static class GroupCommitRequest {
            private final long nextOffset;
            private final CountDownLatch countDownLatch = new CountDownLatch(1);//用于控制等待
            private volatile boolean flushOK = false;//是否刷盘成功
    
            public GroupCommitRequest(long nextOffset) {
                this.nextOffset = nextOffset;
            }
    
            public long getNextOffset() {
                return nextOffset;
            }
    
            /**
             * 唤醒customer,设置flushOK值
             */
            public void wakeupCustomer(final boolean flushOK) {
                this.flushOK = flushOK;
                this.countDownLatch.countDown();
            }
    
            /**
             * 等待刷盘timeout的时间,由异步线程调用 wakeupCustomer 唤醒
             */
            public boolean waitForFlush(long timeout) {
                try {
                    this.countDownLatch.await(timeout, TimeUnit.MILLISECONDS);
                    return this.flushOK;//flushOK 是volatile的
                } catch (InterruptedException e) {
                    log.error("Interrupted", e);
                    return false;
                }
            }
        }
    

    解释如下:

    1.nextOffSet的定义是
            /**
             * 下一个请求的offset, 可以结合mappedFileQueue.getFlushedWhere判断,验证是否flushOk
             * 赋值往往是构造函数中,nextOffset = result.getWroteOffset() + result.getWroteBytes()
             * 代表当前偏移 + 当前size
             */
    
    2.wakeupCustomer是用于唤醒waitForFlush的,两者结合使用
    A线程调用waitForFlush,等待一定时间,到真正的countDown
    B线程调用wakeupCustomer,进行唤醒
    

    GroupCommitService

    属性

            //分别为读写请求
            private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
            private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
    

    每次生成同步请求时,加入队列requestsWrite
    每次准备消费时,把requestsWrite和requestsRead 交换(其实requestsRead 当时size为0)

    方法

    putRequest:同步刷盘且需要落盘应答是,放置同步请求

            public synchronized void putRequest(final GroupCommitRequest request) {
                synchronized (this.requestsWrite) {
                    this.requestsWrite.add(request);
                }
                //ServiceThread方法,唤醒waitForRunning
                if (hasNotified.compareAndSet(false, true)) {
                    waitPoint.countDown(); // notify
                }
            }
    

    run 线程方法,循环调用waitForRunning,doCommit

            public void run() {
                CommitLog.log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    try {
                        this.waitForRunning(10);//最多等待10ms,期间调用onWaitEnd将读写队列交换,会被putRequest唤醒
                        this.doCommit();
                    } catch (Exception e) {
                        CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                    }
                }
    
                // Under normal circumstances shutdown, wait for the arrival of the
                // request, and then flush
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    CommitLog.log.warn("GroupCommitService Exception, ", e);
                }
    
                synchronized (this) {
                    this.swapRequests();
                }
    
                this.doCommit();
    
                CommitLog.log.info(this.getServiceName() + " service end");
            }
    

    waitForRunning在父类方法中会调用onWaitEnd

            @Override
            protected void onWaitEnd() {//线程waitForRunning执行的函数
                this.swapRequests();//读写请求交换
            }
    

    swapRequests如下

            /**
             * requestsWrite 和 requestsRead 两个队列互换
             * requestsRead 其实每次调用doCommit之后都会清空
             * 实际上就是Write赋值给Read
             */
            private void swapRequests() {
                List<GroupCommitRequest> tmp = this.requestsWrite;
                this.requestsWrite = this.requestsRead;
                this.requestsRead = tmp;
            }
    

    doCommit如下,完成刷盘以及刷盘的检查

            /**
             * 叫doFlush更好,更新mappedFile的flushPosition
             */
            private void doCommit() {
                synchronized (this.requestsRead) {
                    if (!this.requestsRead.isEmpty()) {//需要读的队列不为空
                        for (GroupCommitRequest req : this.requestsRead) {
                            // There may be a message in the next file, so a maximum of
                            // two times the flush
                            boolean flushOK = false;
                            //重试,有可能 之前的请求刷的时候,顺便把这个请求也刷了,那么flushOk直接为true
                            //如果之前没有刷到,那么自己再刷
                            //  如果是同一个mappedFile,那么执行一次flush就成功
                            //  如果跨越了两个mappedFile,那么执行两次flush才成功
                            for (int i = 0; i < 2 && !flushOK; i++) {
                                flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();//是否已经刷到了下一个需要刷的位置
    
                                if (!flushOK) {
                                    CommitLog.this.mappedFileQueue.flush(0);//参数为0代表尝试flush,更新storeTimeStamp
                                }
                            }
    
                            req.wakeupCustomer(flushOK);//设置req的flushOk值,唤醒等待请求
                        }
    
                        long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();//获取在commitLog的存储时间
                        if (storeTimestamp > 0) {
                            CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);//更新storeCheckPoint的时间
                        }
    
                        this.requestsRead.clear();//每次都清空 读队列
                    } else {
                        // Because of individual messages is set to not sync flush, it
                        // will come to this process
                        // 不需要落盘应答的
                        CommitLog.this.mappedFileQueue.flush(0);//参数为0代表尝试flush,更新storeTimeStamp
                    }
                }
            }
    

    异步刷盘

    FlushRealTimeService : 未开启内存字节缓冲区

    比较好理解

        class FlushRealTimeService extends FlushCommitLogService {
            private long lastFlushTimestamp = 0;
            private long printTimes = 0;//时间每过一个 FlushCommitLogThoroughInterval,则+1
    
            public void run() {
                CommitLog.log.info(this.getServiceName() + " service started");
    
                while (!this.isStopped()) {
                    boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();//异步flush是否是固定周期,默认false
    
                    int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();//异步刷盘的间隔,默认500ms
                    int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();//异步刷盘至少要几页,默认4
    
                    int flushPhysicQueueThoroughInterval =
                        CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();//彻底flush时间周期,默认10s
    
                    boolean printFlushProgress = false;//是否输出flush进度
    
                    // Print flush progress
                    long currentTimeMillis = System.currentTimeMillis();
                    if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {//如果距离上一次flush过了 彻底周期
                        this.lastFlushTimestamp = currentTimeMillis;
                        flushPhysicQueueLeastPages = 0;//此时至少刷0页就行
                        printFlushProgress = (printTimes++ % 10) == 0;
                    }
    
                    try {
                        if (flushCommitLogTimed) {//固定周期flush
                            Thread.sleep(interval);
                        } else {//ServiceThread方法,可以被唤醒
                            this.waitForRunning(interval);
                        }
    
                        if (printFlushProgress) {
                            this.printFlushProgress();
                        }
    
                        long begin = System.currentTimeMillis();
                        CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
                        long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                        if (storeTimestamp > 0) {
                            CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                        }
                        long past = System.currentTimeMillis() - begin;
                        if (past > 500) {
                            log.info("Flush data to disk costs {} ms", past);
                        }
                    } catch (Throwable e) {
                        CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                        this.printFlushProgress();
                    }
                }
    
                // Normal shutdown, to ensure that all the flush before exit
                boolean result = false;
                for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {//shutdown之后,再flush多次
                    result = CommitLog.this.mappedFileQueue.flush(0);
                    CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
                }
    
                this.printFlushProgress();//输出flush进度
    
                CommitLog.log.info(this.getServiceName() + " service end");
            }
    
            @Override
            public String getServiceName() {
                return FlushRealTimeService.class.getSimpleName();
            }
    
            private void printFlushProgress() {//输出flush进度,目前没干事情
                // CommitLog.log.info("how much disk fall behind memory, "
                // + CommitLog.this.mappedFileQueue.howMuchFallBehind());
            }
    
            @Override
            public long getJointime() {
                return 1000 * 60 * 5;
            }
        }
    

    注意几点:

    1.获取异步刷盘的间隔interval(默认500ms),看异步flush是否是固定周期,是的话每次sleep这么久,否则是waitForRunning这么久,中间可以被唤醒
    2.获取flushPhysicQueueLeastPages代表默认刷盘要刷几页,正常interval周期需要有这么多页才去刷
    3.获取flushPhysicQueueThoroughInterval代表强制刷盘周期,默认10s,距离上次刷盘间隔这么久时间之后,无论页数是否满足,都去刷盘

    CommitRealTimeService:开启内存字节缓冲区

    看懂了上面的,这个也比较简单

        class CommitRealTimeService extends FlushCommitLogService {
    
            private long lastCommitTimestamp = 0;//最近一次commit时间
    
            @Override
            public String getServiceName() {
                return CommitRealTimeService.class.getSimpleName();
            }
    
            @Override
            public void run() {
                CommitLog.log.info(this.getServiceName() + " service started");
                while (!this.isStopped()) {
                    int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();//提交间隔,默认200
    
                    int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();//提交页,默认4
    
                    int commitDataThoroughInterval =
                        CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();//默认200
    
                    long begin = System.currentTimeMillis();
                    if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {//距离上一次commit过了强制commit间隔,就不管页数是否达到要求
                        this.lastCommitTimestamp = begin;
                        commitDataLeastPages = 0;
                    }
    
                    try {
                        boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
                        long end = System.currentTimeMillis();
                        if (!result) {//此时代表commit成功
                            this.lastCommitTimestamp = end; // result = false means some data committed.
                            //now wake up flush thread.
                            flushCommitLogService.wakeup();//唤醒flush
                        }
    
                        if (end - begin > 500) {
                            log.info("Commit data to file costs {} ms", end - begin);
                        }
                        this.waitForRunning(interval);//等待wakeup 唤醒
                    } catch (Throwable e) {
                        CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
                    }
                }
    
                boolean result = false;
                for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
                    result = CommitLog.this.mappedFileQueue.commit(0);
                    CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
                }
                CommitLog.log.info(this.getServiceName() + " service end");
            }
        }
    

    注意点如下:

    1.interval,commitDataLeastPages,commitDataThoroughInterval意义和上面类似,就是commit间隔默认200ms,至少commit页数默认4,彻底commit时间为200ms(此时不管页数是否到满足)
    2.如果commit成功,唤醒flushCommitLogService,完成将mappedFile中的writeBuffer刷到磁盘中去
    3.注意一点,异步刷盘,开启了缓存池,也是CommitRealTimeService 以及 FlushRealTimeService配合完成的

    思考

    CommitLog中提交执行的顺序

    putMessage -> handleDiskFlush -> 三种情况(同步,异步开启TransientStorePool,异步关闭TransientStorePool

    GroupCommitService 代码执行是同步的吗

    不是的,主线程执行waitForFlush,异步线程执行wakeupCustomer
    但是官方文档也是叫"同步刷盘",也许是时间设置了,GroupCommitService#run最多等待10ms就执行一次,时间接近于"同步"

    同步刷盘,是否需要落盘应答对应的不同逻辑

    调用方:参考CommitLog#handleDiskFlush


    区别

    被调用方:详见GroupCommitService#doCommit中
    需要落盘应答的,会处理requestsWrite和requestsRead,依次检查各个GroupCommitRequest是否flushOk,通知waitForFlush
    不需要落盘应答的,requestsWrite和requestsRead size都为0,只会进入else条件

    不需要落盘应答的进入else逻辑

    CommitLog.this.mappedFileQueue.flush(0);是干啥

    就是尝试flush,更新storeTimeStamp

    同步刷盘需要落盘应答时,为什么会for循环两次

    for循环两次

    //重试,有可能 之前的请求刷的时候,顺便把这个请求也刷了,那么flushOk直接为true
    //如果之前没有刷到,那么自己再刷
    // 如果是同一个mappedFile,那么执行一次flush就成功
    // 如果跨越了两个mappedFile,那么执行两次flush才成功

    同步刷盘不需要落盘 和 异步刷盘没有开启缓存池 的区别

    几乎就是时间上的区别

    同步刷盘不需要落盘:GroupCommitService#run,每10ms(不可配置,固定)检查一次是否需要刷
    异步刷盘没有开启缓存池:FlushRealTimeService#run, 每500ms(可配置)检查一次是否需要刷,每10s(可配置)强制刷

    三种刷盘方式总结

    见最上面 说明 中的表格以及图片

    问题

    三种刷盘方式的比较

    暂时不清楚哪种效率最高,各自的优缺点

    吐槽

    同步刷盘且等待落盘应答的时间大小配置

    GroupCommitService#run中调用的

      this.waitForRunning(10);
    

    也就是10ms转换一次读写队列

    CommitLog#handleDiskFlush中允许等待的时间是

    boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());//同步刷盘且等待落盘应答时,默认等待5s
    

    默认5s
    也就是以后配置的话,这个参数其实是有大小限制的,至少要有(10ms + doCommit的代码执行时间)

    否则<10ms的话会出现数据不匹配的情况,同步wait返回flushOk为false但是异步GroupCommitService最终flush了

    但是似乎并没有这个配置检查。

    GroupCommitService的doCommit函数名

    这个里面执行的都是flush,但是函数名叫doCommit,容易让人误解

    CommitRealTimeService的interval与commitDataThoroughInterval

    现在默认都是200ms,后者应该比前者大才对,还是需要自己配置

    refer

    http://www.jianshu.com/p/2b9135fb6b5d 自己之前对flush以及commit的整理
    https://github.com/YunaiV/Blog/blob/master/RocketMQ/1004-RocketMQ%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%EF%BC%9AMessage%E5%AD%98%E5%82%A8.md
    http://blog.csdn.net/prestigeding/article/details/76652063
    http://blog.csdn.net/meilong_whpu/article/details/76919267

    相关文章

      网友评论

          本文标题:store模块阅读18:CommitLog(2): FlushC

          本文链接:https://www.haomeiwen.com/subject/rmzzpxtx.html