美文网首页
rocketmq之消息同步与异步刷盘学习笔记

rocketmq之消息同步与异步刷盘学习笔记

作者: heyong | 来源:发表于2019-07-24 15:51 被阅读0次

    一、刷新服务

    image.png

    二、GroupCommitService

    (一) GroupCommitService核心属性

    • List<GroupCommitRequest> requestsWrite:请求写入队列

    • List<GroupCommitRequest> requestsRead:请求读队列

    上面两个队列会进行交换,每次刷盘请求是写到requestsWrite队列中,GroupCommitService处理刷盘请求之前,会执行队列交换

    (二) 添加刷盘请求

    当消息写到缓冲池以后,会调用下面方面进行磁盘刷新和主从复制

    handleDiskFlush(result, putMessageResult, msg);
    handleHA(result, putMessageResult, msg);
    

    如果刷盘方式是同步,那么就构造一个GroupCommitRequest请求,GroupCommitService服务异步刷新数据,如果是异步刷盘,就唤醒异步刷盘服务

    public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
        // 同步刷盘
        if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
            final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
            if (messageExt.isWaitStoreMsgOK()) {
                GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
                // 将刷盘请求放到队列中
                service.putRequest(request);
                // 使用了CountDownLatch机制,等待刷盘成功
                boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                if (!flushOK) {
                    log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
                        + " client address: " + messageExt.getBornHostString());
                    putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
                }
            } else {
                service.wakeup();
            }
        }
        // 异步刷盘
        else {
            if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                flushCommitLogService.wakeup();
            } else {
                commitLogService.wakeup();
            }
        }
    }
    

    (三) 刷盘请求处理

    GroupCommitService从队列中取出刷盘请求,并执行刷盘操作

    public void run() {
        CommitLog.log.info(this.getServiceName() + " service started");
    ​
        while (!this.isStopped()) {
            try {
                // 每隔10ms,交换读写队列
                this.waitForRunning(10);
                // 真正的刷盘
                this.doCommit();
            } catch (Exception e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    ​
        // Under normal circumstances shutdown, wait for the arrival of the
        // request, and then flush
        try {
            Thread.sleep(10);
        } catch (InterruptedException e) {
            CommitLog.log.warn("GroupCommitService Exception, ", e);
        }
    ​
        synchronized (this) {
            this.swapRequests();
        }
    ​
        this.doCommit();
    ​
        CommitLog.log.info(this.getServiceName() + " service end");
    }
    

    每隔10ms,会进行一次读写队列的交换,并调用doCommit()方法,执行真正的磁盘刷新操作。

    private void doCommit() {
        synchronized (this.requestsRead) {
            // 读队列不为空,遍历所有刷盘请求,执行数据刷盘操作
            if (!this.requestsRead.isEmpty()) {
                for (GroupCommitRequest req : this.requestsRead) {
                    // There may be a message in the next file, so a maximum of
                    // two times the flush
                    boolean flushOK = false;
                    for (int i = 0; i < 2 && !flushOK; i++) {
                        flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
                        if (!flushOK) {
                            CommitLog.this.mappedFileQueue.flush(0);
                        }
                    }
                    req.wakeupCustomer(flushOK);
                }
                // 设置检查点
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                // 处理完刷盘请求以后,执行队列清空操作
                this.requestsRead.clear();
            } else {
                // Because of individual messages is set to not sync flush, it
                // will come to this process
                CommitLog.this.mappedFileQueue.flush(0);
            }
        }
    }
    

    三、FlushRealTimeService

    (一) 核心属性

    • long lastFlushTimestamp:最后一次刷盘时间戳

    • long printTimes = 0:

    (二) 异步刷盘流程

    public void run() {
        while (!this.isStopped()) {
            // 刷新策略(默认是实时刷盘)
            boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
            // 刷盘间隔
            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
            // 每次刷盘至少需要多少个page(默认是4个)
            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
            // 彻底刷盘间隔时间(默认10s)
            int flushPhysicQueueThoroughInterval =
                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
    ​
            boolean printFlushProgress = false;
            // Print flush progress
            long currentTimeMillis = System.currentTimeMillis();
            // 当前时间 >(最后一次刷盘时间 + 彻底刷盘间隔时间(10s)),则将最新一次刷盘时间更新为当前时间
            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
                this.lastFlushTimestamp = currentTimeMillis;
                flushPhysicQueueLeastPages = 0;
                printFlushProgress = (printTimes++ % 10) == 0;
            }
    ​
            try {
                if (flushCommitLogTimed) {
                    Thread.sleep(interval);
                } else {
                    this.waitForRunning(interval);
                }
    ​
                if (printFlushProgress) {
                    this.printFlushProgress();
                }
    ​
                long begin = System.currentTimeMillis();
                CommitLog.this.org.apache.rocketmq.store.MappedFileQueue#flush;
                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
                if (storeTimestamp > 0) {
                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
                }
                long past = System.currentTimeMillis() - begin;
                if (past > 500) {
                    log.info("Flush data to disk costs {} ms", past);
                }
            } catch (Throwable e) {
                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
                this.printFlushProgress();
            }
        }
    ​
        // Normal shutdown, to ensure that all the flush before exit
        boolean result = false;
        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
            result = CommitLog.this.mappedFileQueue.flush(0);
            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
        }
    ​
        this.printFlushProgress();
    ​
        CommitLog.log.info(this.getServiceName() + " service end");
    }
    

    如果是实时刷盘,每隔一定时间间隔,该线程休眠500毫秒,如果不是实时刷盘,则调用waitForRunning,即每隔500毫秒或该刷盘服务线程调用了wakeup()方法之后结束阻塞。最后调用 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages)进行刷盘 。

    数据刷盘调用了MappedFileQueue.flush(int flushLeastPages)方法,进行刷盘

    public boolean flush(final int flushLeastPages) {
        boolean result = true;
        // 根据offset,找到对应的MappedFile
        MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
        if (mappedFile != null) {
            long tmpTimeStamp = mappedFile.getStoreTimestamp();
            // 调用MappedFile的刷盘方法
            int offset = mappedFile.flush(flushLeastPages);
            long where = mappedFile.getFileFromOffset() + offset;
            result = where == this.flushedWhere;
            this.flushedWhere = where;
            if (0 == flushLeastPages) {
                this.storeTimestamp = tmpTimeStamp;
            }
        }
        return result;
    }
    

    数据刷盘,最终是调用 了MappedFile.flush(int flushLeastPages)方法实现数据持久化,具体实现如下:

    /**
     * @return The current flushed position
     */
    public int flush(final int flushLeastPages) {
        // 判断是否可以刷盘
        if (this.isAbleToFlush(flushLeastPages)) {
            if (this.hold()) {
                int value = getReadPosition();
                try {
                    //We only append data to fileChannel or mappedByteBuffer, never both.
                    if (writeBuffer != null || this.fileChannel.position() != 0) {
                        this.fileChannel.force(false);
                    } else {
                        this.mappedByteBuffer.force();
                    }
                } catch (Throwable e) {
                    log.error("Error occurred when force data to disk.", e);
                }
                this.flushedPosition.set(value);
                this.release();
            } else {
                log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
                this.flushedPosition.set(getReadPosition());
            }
        }
        return this.getFlushedPosition();
    }
    

    调用isAbleToFlush方法判断是否可以刷盘,判断逻辑如下:

    private boolean isAbleToFlush(final int flushLeastPages) {
        int flush = this.flushedPosition.get();
        int write = getReadPosition();
        // 如果文件已满
        if (this.isFull()) {
            return true;
        }
        // 需要刷盘页数 > 最少刷盘页数
        if (flushLeastPages > 0) {
            return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
        }
        return write > flush;
    }
    

    相关文章

      网友评论

          本文标题:rocketmq之消息同步与异步刷盘学习笔记

          本文链接:https://www.haomeiwen.com/subject/lxnxrctx.html