美文网首页
rocketMQ_store模块之mappedFile获取

rocketMQ_store模块之mappedFile获取

作者: kele2018 | 来源:发表于2019-08-28 13:56 被阅读0次

    1、我们要干什么?

     (1)rocketMQ的消息是存储在文件里的,那么假如要存的时候文件不存在怎么办?》》创建,另外,为了读写快一点,我们创建的是内存映射文件;
    

    2、实现方式:两种
    (1)向allocateMappedFileService提交任务;
    (2)直接创建文件;

    if (this.allocateMappedFileService != null) {
                    /**
                    第一种创建方法:向allocateMappedFileService提交任务
                        1、allocateMappedFileService是啥,后面说;
                        2、这里默认会创建两个文件:nextFilePath、nextNextFilePath
                    */
                    mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                        nextNextFilePath, this.mappedFileSize);
                } else {
                    try {
                        // 第二种创建方法:直接创建文件
                        mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                    } catch (IOException e) {
                        log.error("create mappedFile exception", e);
                    }
                }
    

    3、向allocateMappedFileService提交任务;

    
            //构建第一个创建任务
            AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
            //把构建好的任务存到requestTable中,requestTable是个ConcurrentHashMap
            boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
            if (nextPutOK) {
                if (canSubmitRequests <= 0) {
                    log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                        "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                    this.requestTable.remove(nextFilePath);
                    return null;
                }
                //把构建好的任务存到requestQueue中,requestQueue是个PriorityBlockingQueue
                boolean offerOK = this.requestQueue.offer(nextReq);
                if (!offerOK) {
                    log.warn("never expected here, add a request to preallocate queue failed");
                }
                canSubmitRequests--;
            }
            // 构建第二个创建任务
            AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
            boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
            if (nextNextPutOK) {
                if (canSubmitRequests <= 0) {
                    log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                        "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                    this.requestTable.remove(nextNextFilePath);
                } else {
                    boolean offerOK = this.requestQueue.offer(nextNextReq);
                    if (!offerOK) {
                        log.warn("never expected here, add a request to preallocate queue failed");
                    }
                }
            }
    

    我们可以看到,这里我们只是构建了任务,并把任务放到requestQueue和requestTable中,那么问题来了,任务是怎么执行的呢?》》见第四部分
    4、AllocateMappedFileService
    (1)AllocateMappedFileService会在DefaultMessageStore实例化的时候实例化;紧接着会调用其start方法;
    下面我们看这个start方法做了什么?

    public void start() {
            log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
            if (!started.compareAndSet(false, true)) {
                return;
            }
            stopped = false;
            this.thread = new Thread(this, getServiceName());
            this.thread.setDaemon(isDaemon);
            this.thread.start();
        }
    

    这个start方法是定义在其父类(ServiceThread)中的;我们可以看到,这里启动了一个线程,任务就是我们的this对象;那么this是谁呢,因为我们是通过AllocateMappedFileService的实例调到这里的,所以this就是AllocateMappedFileService的实例;既然this是AllocateMappedFileService的实例,我们就应该去看看其run方法,因为具体的任务是定义在run方法中的;

    public void run() {
           log.info(this.getServiceName() + " service started");
           // 服务没有停止并且mmapOperation操作返回ture
           while (!this.isStopped() && this.mmapOperation()) {
    
           }
           log.info(this.getServiceName() + " service end");
       }
    

    我们可以看到,这个线程在不停的执行mmapOperation方法;

          if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                        //使用直接内存(即堆外内存)创建
                        try {
                            /**
                             * ServiceLoader是java.util包下的,再找资料查阅
                             */
                            mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                            mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                        } catch (RuntimeException e) {
                            log.warn("Use default implementation.");
                            mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                        }
                    } else {
                        // 使用堆内存创建
                        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                    }
    

    从这个代码片段我们可以看到,mappedFile 就是在这里创建的,而其中用到的参数req就是我们之前存到requestQueue中的任务;

    相关文章

      网友评论

          本文标题:rocketMQ_store模块之mappedFile获取

          本文链接:https://www.haomeiwen.com/subject/efipectx.html