美文网首页
rocketMQ_store模块之mappedFile获取

rocketMQ_store模块之mappedFile获取

作者: kele2018 | 来源:发表于2019-08-28 13:56 被阅读0次

1、我们要干什么?

 (1)rocketMQ的消息是存储在文件里的,那么假如要存的时候文件不存在怎么办?》》创建,另外,为了读写快一点,我们创建的是内存映射文件;

2、实现方式:两种
(1)向allocateMappedFileService提交任务;
(2)直接创建文件;

if (this.allocateMappedFileService != null) {
                /**
                第一种创建方法:向allocateMappedFileService提交任务
                    1、allocateMappedFileService是啥,后面说;
                    2、这里默认会创建两个文件:nextFilePath、nextNextFilePath
                */
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
            } else {
                try {
                    // 第二种创建方法:直接创建文件
                    mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
                } catch (IOException e) {
                    log.error("create mappedFile exception", e);
                }
            }

3、向allocateMappedFileService提交任务;


        //构建第一个创建任务
        AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
        //把构建好的任务存到requestTable中,requestTable是个ConcurrentHashMap
        boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
        if (nextPutOK) {
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextFilePath);
                return null;
            }
            //把构建好的任务存到requestQueue中,requestQueue是个PriorityBlockingQueue
            boolean offerOK = this.requestQueue.offer(nextReq);
            if (!offerOK) {
                log.warn("never expected here, add a request to preallocate queue failed");
            }
            canSubmitRequests--;
        }
        // 构建第二个创建任务
        AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
        boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
        if (nextNextPutOK) {
            if (canSubmitRequests <= 0) {
                log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
                    "RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
                this.requestTable.remove(nextNextFilePath);
            } else {
                boolean offerOK = this.requestQueue.offer(nextNextReq);
                if (!offerOK) {
                    log.warn("never expected here, add a request to preallocate queue failed");
                }
            }
        }

我们可以看到,这里我们只是构建了任务,并把任务放到requestQueue和requestTable中,那么问题来了,任务是怎么执行的呢?》》见第四部分
4、AllocateMappedFileService
(1)AllocateMappedFileService会在DefaultMessageStore实例化的时候实例化;紧接着会调用其start方法;
下面我们看这个start方法做了什么?

public void start() {
        log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
        if (!started.compareAndSet(false, true)) {
            return;
        }
        stopped = false;
        this.thread = new Thread(this, getServiceName());
        this.thread.setDaemon(isDaemon);
        this.thread.start();
    }

这个start方法是定义在其父类(ServiceThread)中的;我们可以看到,这里启动了一个线程,任务就是我们的this对象;那么this是谁呢,因为我们是通过AllocateMappedFileService的实例调到这里的,所以this就是AllocateMappedFileService的实例;既然this是AllocateMappedFileService的实例,我们就应该去看看其run方法,因为具体的任务是定义在run方法中的;

public void run() {
       log.info(this.getServiceName() + " service started");
       // 服务没有停止并且mmapOperation操作返回ture
       while (!this.isStopped() && this.mmapOperation()) {

       }
       log.info(this.getServiceName() + " service end");
   }

我们可以看到,这个线程在不停的执行mmapOperation方法;

      if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                    //使用直接内存(即堆外内存)创建
                    try {
                        /**
                         * ServiceLoader是java.util包下的,再找资料查阅
                         */
                        mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
                        mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    } catch (RuntimeException e) {
                        log.warn("Use default implementation.");
                        mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
                    }
                } else {
                    // 使用堆内存创建
                    mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
                }

从这个代码片段我们可以看到,mappedFile 就是在这里创建的,而其中用到的参数req就是我们之前存到requestQueue中的任务;

相关文章

网友评论

      本文标题:rocketMQ_store模块之mappedFile获取

      本文链接:https://www.haomeiwen.com/subject/efipectx.html