1、我们要干什么?
(1)rocketMQ的消息是存储在文件里的,那么假如要存的时候文件不存在怎么办?》》创建,另外,为了读写快一点,我们创建的是内存映射文件;
2、实现方式:两种
(1)向allocateMappedFileService提交任务;
(2)直接创建文件;
if (this.allocateMappedFileService != null) {
/**
第一种创建方法:向allocateMappedFileService提交任务
1、allocateMappedFileService是啥,后面说;
2、这里默认会创建两个文件:nextFilePath、nextNextFilePath
*/
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
} else {
try {
// 第二种创建方法:直接创建文件
mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
}
3、向allocateMappedFileService提交任务;
//构建第一个创建任务
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
//把构建好的任务存到requestTable中,requestTable是个ConcurrentHashMap
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
if (nextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextFilePath);
return null;
}
//把构建好的任务存到requestQueue中,requestQueue是个PriorityBlockingQueue
boolean offerOK = this.requestQueue.offer(nextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
canSubmitRequests--;
}
// 构建第二个创建任务
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().availableBufferNums());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
我们可以看到,这里我们只是构建了任务,并把任务放到requestQueue和requestTable中,那么问题来了,任务是怎么执行的呢?》》见第四部分
4、AllocateMappedFileService
(1)AllocateMappedFileService会在DefaultMessageStore实例化的时候实例化;紧接着会调用其start方法;
下面我们看这个start方法做了什么?
public void start() {
log.info("Try to start service thread:{} started:{} lastThread:{}", getServiceName(), started.get(), thread);
if (!started.compareAndSet(false, true)) {
return;
}
stopped = false;
this.thread = new Thread(this, getServiceName());
this.thread.setDaemon(isDaemon);
this.thread.start();
}
这个start方法是定义在其父类(ServiceThread)中的;我们可以看到,这里启动了一个线程,任务就是我们的this对象;那么this是谁呢,因为我们是通过AllocateMappedFileService的实例调到这里的,所以this就是AllocateMappedFileService的实例;既然this是AllocateMappedFileService的实例,我们就应该去看看其run方法,因为具体的任务是定义在run方法中的;
public void run() {
log.info(this.getServiceName() + " service started");
// 服务没有停止并且mmapOperation操作返回ture
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}
我们可以看到,这个线程在不停的执行mmapOperation方法;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
//使用直接内存(即堆外内存)创建
try {
/**
* ServiceLoader是java.util包下的,再找资料查阅
*/
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
// 使用堆内存创建
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
从这个代码片段我们可以看到,mappedFile 就是在这里创建的,而其中用到的参数req就是我们之前存到requestQueue中的任务;
网友评论