说明
这个类是提前创建MappedFile的服务,线程类,继承ServiceThread类(这个之后讲)
简介如下
AllocateRequest:内部类,代表分配请求
putRequestAndReturnMappedFile方法: 同步,生产分配请求,等待异步线程处理,最多5s,完成创建MappedFile的请求
run方法:异步完成分配请求的消费,调用mmapOperation方法,完成对请求处理,创建文件
内部类
AllocateRequest是内部类,代表分配请求
注意两个点
1.实现了Comparable接口,为了优先队列的插入
2.属性filePath和fileSize是创建的前提信息,countDownLatch以及mappedFile是为了获取结果
简介如下
static class AllocateRequest implements Comparable<AllocateRequest> {
//内部类,注意实现了Comparable接口,为了优先队列的插入
// Full file path
private String filePath;//路径
private int fileSize;//文件大小
private CountDownLatch countDownLatch = new CountDownLatch(1);//为0代表创建完成
private volatile MappedFile mappedFile = null;//根据 路径 以及 文件大小创建出来的结果
public AllocateRequest(String filePath, int fileSize) {
this.filePath = filePath;
this.fileSize = fileSize;
}
//省略get,set方法
/**
* fileSize大的优先级高
* 否则 对应fileName的long型小的,优先级高
* @param other
* @return
*/
public int compareTo(AllocateRequest other) {
if (this.fileSize < other.fileSize)
return 1;
else if (this.fileSize > other.fileSize) {
return -1;
} else {
int mIndex = this.filePath.lastIndexOf(File.separator);
long mName = Long.parseLong(this.filePath.substring(mIndex + 1));
int oIndex = other.filePath.lastIndexOf(File.separator);
long oName = Long.parseLong(other.filePath.substring(oIndex + 1));
if (mName < oName) {
return -1;
} else if (mName > oName) {
return 1;
} else {
return 0;
}
}
// return this.fileSize < other.fileSize ? 1 : this.fileSize >
// other.fileSize ? -1 : 0;
}
属性
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static int waitTimeOut = 1000 * 5;//生成对应请求到创建MappedFile,可以等待5s
private ConcurrentMap<String, AllocateRequest> requestTable =
new ConcurrentHashMap<String, AllocateRequest>();//key是filePath, value是分配请求
private PriorityBlockingQueue<AllocateRequest> requestQueue =
new PriorityBlockingQueue<AllocateRequest>();//分配请求的 队列,注意是优先级队列
private volatile boolean hasException = false;
private DefaultMessageStore messageStore;
注意两个数据结构requestTable 以及requestQueue
方法
重要方法如下
putRequestAndReturnMappedFile
同步处理,提交两个创建MappedFile的请求,路径是 nextFilePath 和 nextNextFilePath
等待nextFilePath创建完成,(nextNextFilePath只是放在记录中,并不用同步等它创建完)
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {
int canSubmitRequests = 2;//默认可以提交2个请求
if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//开启了TransientStorePool
if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()
&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in pool
canSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();//总共剩余的数量 - 已经要分配的数量
}
}
AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);
boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;
//添加nextFilePath
if (nextPutOK) {//如果之前为空
if (canSubmitRequests <= 0) {//名额不够了
log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextFilePath);//清除table记录
return null;
}
boolean offerOK = this.requestQueue.offer(nextReq);//添加至队列
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
canSubmitRequests--;
}
//添加nextNextFilePath
AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);
boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;
if (nextNextPutOK) {
if (canSubmitRequests <= 0) {
log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +
"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());
this.requestTable.remove(nextNextFilePath);
} else {
boolean offerOK = this.requestQueue.offer(nextNextReq);
if (!offerOK) {
log.warn("never expected here, add a request to preallocate queue failed");
}
}
}
if (hasException) {//mmapOperation遇到了异常,先不创建mappedFile了
log.warn(this.getServiceName() + " service has exception. so return null");
return null;
}
AllocateRequest result = this.requestTable.get(nextFilePath);
try {
if (result != null) {
//run方法调用mmapOperation进行实际创建,调用countDown()
boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);
if (!waitOK) {//创建超时
log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());
return null;
} else {//创建成功
this.requestTable.remove(nextFilePath);//删掉table的记录
return result.getMappedFile();
}
} else {
log.error("find preallocate mmap failed, this never happen");
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
return null;
}
run
异步处理,调用mmapOperation完成请求的处理
public void run() {
log.info(this.getServiceName() + " service started");
while (!this.isStopped() && this.mmapOperation()) {
}
log.info(this.getServiceName() + " service end");
}
好懂,isStopped是父类ServiceThread的方法
mmapOperation
处理请求,只有interrupt才会返回false
private boolean mmapOperation() {
boolean isSuccess = false;
AllocateRequest req = null;
try {
req = this.requestQueue.take();//排队拿请求
AllocateRequest expectedRequest = this.requestTable.get(req.getFilePath());
if (null == expectedRequest) {
log.warn("this mmap request expired, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize());//有可能是请求等待超时了导致table的记录被删掉了
return true;
}
if (expectedRequest != req) {
log.warn("never expected here, maybe cause timeout " + req.getFilePath() + " "
+ req.getFileSize() + ", req:" + req + ", expectedRequest:" + expectedRequest);
return true;//有可能请求等待超时导致一个path对应在table和queue中的记录不匹配了
}
if (req.getMappedFile() == null) {//还没有创建
long beginTime = System.currentTimeMillis();
MappedFile mappedFile;
if (messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {//允许用transientStorePool
try {
mappedFile = ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {//遇到运行异常用默认配置
log.warn("Use default implementation.");
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize(), messageStore.getTransientStorePool());
}
} else {
mappedFile = new MappedFile(req.getFilePath(), req.getFileSize());
}
long eclipseTime = UtilAll.computeEclipseTimeMilliseconds(beginTime);
if (eclipseTime > 10) {
int queueSize = this.requestQueue.size();
log.warn("create mappedFile spent time(ms) " + eclipseTime + " queue size " + queueSize
+ " " + req.getFilePath() + " " + req.getFileSize());
}
// pre write mappedFile
if (mappedFile.getFileSize() >= this.messageStore.getMessageStoreConfig()
.getMapedFileSizeCommitLog()
&&
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {//如果size超过了配置要求,且允许预热
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());//进行预热
}
req.setMappedFile(mappedFile);
this.hasException = false;
isSuccess = true;
}
} catch (InterruptedException e) {
log.warn(this.getServiceName() + " interrupted, possibly by shutdown.");
this.hasException = true;
return false;
} catch (IOException e) {
log.warn(this.getServiceName() + " service has exception. ", e);
this.hasException = true;
if (null != req) {
requestQueue.offer(req);//重新加入队列再试
try {
Thread.sleep(1);
} catch (InterruptedException ignored) {
}
}
} finally {
if (req != null && isSuccess)
req.getCountDownLatch().countDown();//创建成功了就countDown
}
return true;
}
思考
putRequestAndReturnMappedFile为什么要创建两个mappedFile
感觉就是下次调用快一点,这个file用完了,下次用下个file时,已经准备好了
AllocateMappedFileService处理分配请求的模型
putRequestAndReturnMappedFile函数同步处理,完成请求的生产
run以及mmapOperation进行异步处理,完成请求的消费
putRequestAndReturnMappedFile中对hashException的判断
就是mmapOperation函数可能会遇到异常,标记hashException为true
只要没有还原为false,putRequestAndReturnMappedFile就只管加入队列,不用等待5s,等到mappedFile真正创建
问题
mmapOperation中对ServiceLoader的调用
网上搜 对ServiceLoader的利用,demo都是对接口写META-INF文件夹进行配置的,但是这里我没搜到存在对应文件,不知道为什么这样写
shutdown中 interrupt了之后还要join的意义
不理解,interrupt之后还要join,有什么用呢
网友评论