美文网首页
store模块阅读10:AllocateMappedFileSe

store模块阅读10:AllocateMappedFileSe

作者: 赤子心_d709 | 来源:发表于2017-10-17 20:18 被阅读188次

说明

这个类是提前创建MappedFile的服务,线程类,继承ServiceThread类(这个之后讲)

简介如下

AllocateRequest:内部类,代表分配请求
putRequestAndReturnMappedFile方法: 同步,生产分配请求,等待异步线程处理,最多5s,完成创建MappedFile的请求
run方法:异步完成分配请求的消费,调用mmapOperation方法,完成对请求处理,创建文件

内部类

AllocateRequest是内部类,代表分配请求
注意两个点

1.实现了Comparable接口,为了优先队列的插入
2.属性filePath和fileSize是创建的前提信息,countDownLatch以及mappedFile是为了获取结果

简介如下

    static class AllocateRequest implements Comparable<AllocateRequest> {
        //内部类,注意实现了Comparable接口,为了优先队列的插入

        // Full file path
        private String filePath;//路径
        private int fileSize;//文件大小
        private CountDownLatch countDownLatch = new CountDownLatch(1);//为0代表创建完成
        private volatile MappedFile mappedFile = null;//根据 路径 以及 文件大小创建出来的结果

        public AllocateRequest(String filePath, int fileSize) {
            this.filePath = filePath;
            this.fileSize = fileSize;
        }

        //省略get,set方法

        /**
         * fileSize大的优先级高
         * 否则 对应fileName的long型小的,优先级高
         * @param other
         * @return
         */
        public int compareTo(AllocateRequest other) {
            if (this.fileSize < other.fileSize)
                return 1;
            else if (this.fileSize > other.fileSize) {
                return -1;
            } else {
                int mIndex = this.filePath.lastIndexOf(File.separator);
                long mName = Long.parseLong(this.filePath.substring(mIndex + 1));
                int oIndex = other.filePath.lastIndexOf(File.separator);
                long oName = Long.parseLong(other.filePath.substring(oIndex + 1));
                if (mName < oName) {
                    return -1;
                } else if (mName > oName) {
                    return 1;
                } else {
                    return 0;
                }
            }
            // return this.fileSize < other.fileSize ? 1 : this.fileSize >
            // other.fileSize ? -1 : 0;
        }

属性

    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static int waitTimeOut = 1000 * 5;//生成对应请求到创建MappedFile,可以等待5s
    private ConcurrentMap<String, AllocateRequest> requestTable =
        new ConcurrentHashMap<String, AllocateRequest>();//key是filePath, value是分配请求
    private PriorityBlockingQueue<AllocateRequest> requestQueue =
        new PriorityBlockingQueue<AllocateRequest>();//分配请求的 队列,注意是优先级队列
    private volatile boolean hasException = false;
    private DefaultMessageStore messageStore;

注意两个数据结构requestTable 以及requestQueue

方法

重要方法如下

putRequestAndReturnMappedFile

同步处理,提交两个创建MappedFile的请求,路径是 nextFilePath 和 nextNextFilePath
等待nextFilePath创建完成,(nextNextFilePath只是放在记录中,并不用同步等它创建完)

public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
        int canSubmitRequests = 2;//默认可以提交2个请求
        if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//开启了TransientStorePool
            if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
                && BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
                canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();//总共剩余的数量 - 已经要分配的数量
            }
        }

        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;

        //添加nextFilePath
        if (nextPutOK) {//如果之前为空
            if (canSubmitRequests <= 0) {//名额不够了
                log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
                this.requestTable.remove(nextFilePath);//清除table记录
                return null;
            }
            boolean offerOK = this.requestQueue.offer(nextReq);//添加至队列
            if (!offerOK) {
                log.warn("never expected here, add a request to preallocate queue failed");
            }
            canSubmitRequests--;
        }

        //添加nextNextFilePath
        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
        if (nextNextPutOK) {
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
                this.requestTable.remove(nextNextFilePath);
            } else {
                boolean offerOK = this.requestQueue.offer(nextNextReq);
                if (!offerOK) {
                    log.warn("never expected here, add a request to preallocate queue failed");
                }
            }
        }

        if (hasException) {//mmapOperation遇到了异常,先不创建mappedFile了
            log.warn(this.getServiceName() + " service has exception. so return null");
            return null;
        }

        AllocateRequest result = this.requestTable.get(nextFilePath);
        try {
            if (result != null) {
                //run方法调用mmapOperation进行实际创建,调用countDown()
                boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
                if (!waitOK) {//创建超时
                    log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
                    return null;
                } else {//创建成功
                    this.requestTable.remove(nextFilePath);//删掉table的记录
                    return result.getMappedFile();
                }
            } else {
                log.error("find preallocate mmap failed, this never happen");
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
        }

        return null;
    }

run

异步处理,调用mmapOperation完成请求的处理

    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStopped() && this.mmapOperation()) {

        }
        log.info(this.getServiceName() + " service end");
    }

好懂,isStopped是父类ServiceThread的方法

mmapOperation

处理请求,只有interrupt才会返回false

private boolean mmapOperation() {
        boolean isSuccess = false;
        AllocateRequest req = null;
        try {
            req = this.requestQueue.take();//排队拿请求
            AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
            if (null == expectedRequest) {
                log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize());//有可能是请求等待超时了导致table的记录被删掉了
                return true;
            }
            if (expectedRequest != req) {
                log.warn("never expected here,  maybe cause timeout " + req.getFilePath() + " "
                    + req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
                return true;//有可能请求等待超时导致一个path对应在table和queue中的记录不匹配了
            }

            if (req.getMappedFile() == null) {//还没有创建
                long beginTime = System.currentTimeMillis();

                MappedFile mappedFile;
                if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//允许用transientStorePool
                    try {
                        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    } catch (RuntimeException e) {//遇到运行异常用默认配置
                        log.warn("Use default implementation.");
                        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    }
                } else {
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                }

                long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
                if (eclipseTime > 10) {
                    int queueSize = this.requestQueue.size();
                    log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
                        + " " + req.getFilePath() + " " + req.getFileSize());
                }

                // pre write mappedFile
                if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
                    .getMapedFileSizeCommitLog()
                    &&
                    this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {//如果size超过了配置要求,且允许预热
                    mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                        this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());//进行预热
                }

                req.setMappedFile(mappedFile);
                this.hasException = false;
                isSuccess = true;
            }
        } catch (InterruptedException e) {
            log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
            this.hasException = true;
            return false;
        } catch (IOException e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.hasException = true;
            if (null != req) {
                requestQueue.offer(req);//重新加入队列再试
                try {
                    Thread.sleep(1);
                } catch (InterruptedException ignored) {
                }
            }
        } finally {
            if (req != null && isSuccess)
                req.getCountDownLatch().countDown();//创建成功了就countDown
        }
        return true;
    }

思考

putRequestAndReturnMappedFile为什么要创建两个mappedFile

感觉就是下次调用快一点,这个file用完了,下次用下个file时,已经准备好了

AllocateMappedFileService处理分配请求的模型

putRequestAndReturnMappedFile函数同步处理,完成请求的生产
run以及mmapOperation进行异步处理,完成请求的消费

putRequestAndReturnMappedFile中对hashException的判断

就是mmapOperation函数可能会遇到异常,标记hashException为true
只要没有还原为false,putRequestAndReturnMappedFile就只管加入队列,不用等待5s,等到mappedFile真正创建

问题

mmapOperation中对ServiceLoader的调用

网上搜 对ServiceLoader的利用,demo都是对接口写META-INF文件夹进行配置的,但是这里我没搜到存在对应文件,不知道为什么这样写

shutdown中 interrupt了之后还要join的意义

不理解,interrupt之后还要join,有什么用呢

要这个类异步创建的意义什么,同步创建file不行吗

refer

https://fdx321.github.io/2017/08/22/%E3%80%90RocketMQ%E6%BA%90%E7%A0%81%E5%AD%A6%E4%B9%A0%E3%80%916-%E6%B6%88%E6%81%AF%E5%AD%98%E5%82%A8/

相关文章

网友评论

      本文标题:store模块阅读10:AllocateMappedFileSe

      本文链接:https://www.haomeiwen.com/subject/vlhjuxtx.html