说明
consume queue是消息的逻辑队列
相当于字典的目录,用来指定消息在物理文件commit log上的位置。
默认存储路径为{user.home}/store/consumequeue/{topic}/{queueId}
管理mappedFile,每个file默认记录30w条数据,每条数据20个字节(long offset, int size, long tagsCode),即600w个字节大小
其中offset是commitLog中的物理偏移,size是消息在commitLog中的大小,tagsCode应该就是tag
有效数据满足 offset >= 0 && size > 0(无效数据相反,一个文件没写到的地方,byteBuffer读取,都是0)
属性
private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
public static final int CQ_STORE_UNIT_SIZE = 20;//每个存储单元大小
private static final Logger LOG_ERROR = LoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
private final DefaultMessageStore defaultMessageStore;
private final MappedFileQueue mappedFileQueue;
private final String topic;
private final int queueId;
private final ByteBuffer byteBufferIndex;
private final String storePath;//存储路径,默认{user.home}/store/consumequeue/{topic}/{queueId}
private final int mappedFileSize;//文件大小,默认30w * 20字节 = 600w字节
private long maxPhysicOffset = -1;//对应commitLog的最大物理偏移
private volatile long minLogicOffset = 0;//最小的逻辑偏移
private ConsumeQueueExt consumeQueueExt = null;
看注释就行
方法
先直接贴所有的代码好了,都有注释
//注意mappedFileQueue的目录,以及ext是否开启即可
public ConsumeQueue(
final String topic,
final int queueId,
final String storePath,
final int mappedFileSize,
final DefaultMessageStore defaultMessageStore) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.defaultMessageStore = defaultMessageStore;
this.topic = topic;
this.queueId = queueId;
String queueDir = this.storePath
+ File.separator + topic
+ File.separator + queueId;
this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) {//是否开启ConsumeQueueExt,默认不开启
this.consumeQueueExt = new ConsumeQueueExt(
topic,
queueId,
StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
);
}
}
/**
* 加载mappedFileQueue文件到内存
* 如果拓展文件可读,接着加载ConsumeQueueExt
*/
public boolean load() {
boolean result = this.mappedFileQueue.load();
log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));
if (isExtReadEnable()) {//如果consumeQueueExt文件存在,那么就可读
result &= this.consumeQueueExt.load();
}
return result;
}
/**
* 用于设定maxPhysicOffset以及更新最后三个mappedFile的flush,commitPosition
* 1.从前往后读取最后三个mappedFile,依次读取记录,如果有效,更新maxPhysicOffset以及maxExtAddr
* 2.出现第一个无效记录的位置记为processOffset,设置flush,commitPosition,清除mappedFileQueue之后的记录
* 3.如果开启ext,清除maxExtAddr之后的记录
*/
public void recover() {
final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();
if (!mappedFiles.isEmpty()) {
int index = mappedFiles.size() - 3;//从倒数第三个文件开始恢复
if (index < 0)
index = 0;
int mappedFileSizeLogics = this.mappedFileSize;
MappedFile mappedFile = mappedFiles.get(index);
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();//文件名,即绝对偏移
long mappedFileOffset = 0;
long maxExtAddr = 1;
while (true) {
for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong();
if (offset >= 0 && size > 0) {//有效数据
mappedFileOffset = i + CQ_STORE_UNIT_SIZE;
this.maxPhysicOffset = offset;
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;
}
} else {//无效数据
log.info("recover current consume queue file over, " + mappedFile.getFileName() + " "
+ offset + " " + size + " " + tagsCode);
break;
}
}
if (mappedFileOffset == mappedFileSizeLogics) {//读到末尾
index++;
if (index >= mappedFiles.size()) {//恢复完了
log.info("recover last consume queue file over, last maped file "
+ mappedFile.getFileName());
break;
} else {//获取下一个mappedFile
mappedFile = mappedFiles.get(index);
byteBuffer = mappedFile.sliceByteBuffer();
processOffset = mappedFile.getFileFromOffset();
mappedFileOffset = 0;
log.info("recover next consume queue file, " + mappedFile.getFileName());
}
} else {//最后一个文件可能没到mappedFileSizeLogics
log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "
+ (processOffset + mappedFileOffset));
break;
}
}
processOffset += mappedFileOffset;//有效绝对偏移
this.mappedFileQueue.setFlushedWhere(processOffset);
this.mappedFileQueue.setCommittedWhere(processOffset);
this.mappedFileQueue.truncateDirtyFiles(processOffset);//删除后续记录
if (isExtReadEnable()) {
this.consumeQueueExt.recover();
log.info("Truncate consume queue extend file by max {}", maxExtAddr);
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);//ext恢复到maxExtAddr
}
}
}
/**
* 找到消息发送时间最接近timestamp逻辑队列的offset
* @param timestamp
* @return
*/
public long getOffsetInQueueByTime(final long timestamp) {
MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);//找到timestamp对应的mappedFile
if (mappedFile != null) {
long offset = 0;
int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
int high = 0;
int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
long leftIndexValue = -1L, rightIndexValue = -1L;
long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();//从commitLog找到最小物理偏移
SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
if (null != sbr) {
ByteBuffer byteBuffer = sbr.getByteBuffer();
high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;//最后一个有效记录
try {
while (high >= low) {
midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
byteBuffer.position(midOffset);
long phyOffset = byteBuffer.getLong();
int size = byteBuffer.getInt();
if (phyOffset < minPhysicOffset) {//中间记录在commitLog中已经失效了
low = midOffset + CQ_STORE_UNIT_SIZE;//low变化
leftOffset = midOffset;
continue;
}
long storeTime =
this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);//利用commitLog,用参数物理偏移,大小找到该消息的存储时间
if (storeTime < 0) {
return 0;
} else if (storeTime == timestamp) {
targetOffset = midOffset;
break;
} else if (storeTime > timestamp) {//目标值旧了,更新high
high = midOffset - CQ_STORE_UNIT_SIZE;
rightOffset = midOffset;
rightIndexValue = storeTime;
} else {
low = midOffset + CQ_STORE_UNIT_SIZE;//目标值太新了,更新low
leftOffset = midOffset;
leftIndexValue = storeTime;
}
}
if (targetOffset != -1) {
offset = targetOffset;
} else {
if (leftIndexValue == -1) {//二分中值一直在右边
offset = rightOffset;
} else if (rightIndexValue == -1) {//一直在左边
offset = leftOffset;
} else {//左右两个最近的看谁更近
offset =
Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
- rightIndexValue) ? rightOffset : leftOffset;
}
}
return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
} finally {
sbr.release();
}
}
}
return 0;
}
/**
* 删除phyOffet之后的脏文件(包括同mappedFile之后记录,通过修改wrotePosition保证)
* 如果开启ext,找到maxExtAddr,把ext的脏文件也删除掉
*
* 1:依次处理最后一个,倒数第二个MappedFile。。。
* 2.每个文件从第一个数据块开始解析,如果超过phyOffet就,就删掉该文件
* 3.否则该文件继续遍历后续记录,不断修改wrote,commit和flushPosition,直到遍历结束或者数据块大小为空则返回
* 4.如果所有mappedFile都删完了,再truncateByMaxAddress
*
* 这里其实return的地方都要truncateByMaxAddress的?要不然更新maxExtAddr干啥
*/
public void truncateDirtyLogicFiles(long phyOffet) {
int logicFileSize = this.mappedFileSize;
this.maxPhysicOffset = phyOffet - 1;
long maxExtAddr = 1;
while (true) {
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();//反复获取当时最后一个文件(最后一个删掉了,就依次倒数第二,第三个)
if (mappedFile != null) {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
mappedFile.setWrotePosition(0);
mappedFile.setCommittedPosition(0);
mappedFile.setFlushedPosition(0);
for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
long tagsCode = byteBuffer.getLong();
if (0 == i) {//mappedFile开头
if (offset >= phyOffet) {//开头offset比要求要大
this.mappedFileQueue.deleteLastMappedFile();//删除当前最后一个mappedFile
break;
} else {//开头< 指定phyOffet
int pos = i + CQ_STORE_UNIT_SIZE;
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
this.maxPhysicOffset = offset;//更新maxPhysicOffset
// This maybe not take effect, when not every consume queue has extend file.
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;//有ext记录就更新ext记录
}
}
} else {
if (offset >= 0 && size > 0) {//有效记录(如fillBlank时候,offset会写0)
if (offset >= phyOffet) {//开头offset比要求要大,返回
return;
}
int pos = i + CQ_STORE_UNIT_SIZE;
mappedFile.setWrotePosition(pos);
mappedFile.setCommittedPosition(pos);
mappedFile.setFlushedPosition(pos);
this.maxPhysicOffset = offset;//更新maxPhysicOffset
if (isExtAddr(tagsCode)) {
maxExtAddr = tagsCode;//有ext记录就更新ext记录
}
if (pos == logicFileSize) {//文件遍历结束,返回(再上一个结束,偏移肯定更小)
return;
}
} else {
return;
}
}
}
} else {//删完了,没有最后的mappedFile了
break;
}
}
if (isExtReadEnable()) {
this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);
}
}
/**
* 获得最后偏移:
* 1.获得最后一个mappedFile
* 2.找到写的最后一条记录(wrotePosition - 20)
* 3.依次向后读每一条记录,如果有效,读出offset和size,记录lastOffset = offset + size
* 4.返回lastOffset
*
* ???为什么已经是最后一条记录了,还要向后遍历来确认
*/
public long getLastOffset() {
long lastOffset = -1;
int logicFileSize = this.mappedFileSize;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
if (mappedFile != null) {
int position = mappedFile.getWrotePosition() - CQ_STORE_UNIT_SIZE;//最后一个记录写的位置
if (position < 0)
position = 0;
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
byteBuffer.position(position);
for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {//遍历最多 logicFileSize/CQ_STORE_UNIT_SIZE 次
long offset = byteBuffer.getLong();
int size = byteBuffer.getInt();
byteBuffer.getLong();
if (offset >= 0 && size > 0) {//有效数据
lastOffset = offset + size;
} else {//无效数据
break;
}
}
}
return lastOffset;
}
//自己mappedFileQueue flush,如果有ext,也一起flush
public boolean flush(final int flushLeastPages) {
boolean result = this.mappedFileQueue.flush(flushLeastPages);
if (isExtReadEnable()) {
result = result & this.consumeQueueExt.flush(flushLeastPages);
}
return result;
}
/**
* 找到所有mappedFile中最后一条记录的offset即maxOffset < 参数offset的然后删掉
* @param offset
* @return
*/
public int deleteExpiredFile(long offset) {
int cnt = this.mappedFileQueue.deleteExpiredFileByOffset(offset, CQ_STORE_UNIT_SIZE);
this.correctMinOffset(offset);
return cnt;
}
/**
* 修正minLogicOffset,如果ext开启了,把旧的mappedFile都删掉
* 找到第一个mappedFile,遍历每一条记录,得到记录的offsetPy 以及 tagsCode
* 如果offsetPy >= phyMinOffset, 更新minLogicOffset = 该条记录的offset
* 此时tagsCode如果是ext的就记录minExtAddr,用于清除ext的mappedFile
*
* 为什么只清除第一个文件:
* 因为minLogicOffset只会和第一个mappedFile有关
*/
public void correctMinOffset(long phyMinOffset) {
MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
long minExtAddr = 1;
if (mappedFile != null) {
SelectMappedBufferResult result = mappedFile.selectMappedBuffer(0);
if (result != null) {
try {
for (int i = 0; i < result.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {//遍历每一条记录
long offsetPy = result.getByteBuffer().getLong();
result.getByteBuffer().getInt();
long tagsCode = result.getByteBuffer().getLong();
if (offsetPy >= phyMinOffset) {
this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;//该条记录开始的offset
log.info("Compute logical min offset: {}, topic: {}, queueId: {}",
this.getMinOffsetInQueue(), this.topic, this.queueId);
// This maybe not take effect, when not every consume queue has extend file.
if (isExtAddr(tagsCode)) {
minExtAddr = tagsCode;
}
break;
}
}
} catch (Exception e) {
log.error("Exception thrown when correctMinOffset", e);
} finally {
result.release();
}
}
}
if (isExtReadEnable()) {
this.consumeQueueExt.truncateByMinAddress(minExtAddr);
}
}
//最小的偏移的下标
public long getMinOffsetInQueue() {
return this.minLogicOffset / CQ_STORE_UNIT_SIZE;
}
/**
* putMessagePositionInfo的封装函数:
* 1.重试30次,将DispatchRequest拆分
* 2.如果开启了ext,那么先向consumeQueueExt放置CqExtUnit记录, 且更新tagsCode
* 3.调用putMessagePositionInfo
* @param request
*/
public void putMessagePositionInfoWrapper(DispatchRequest request) {
final int maxRetries = 30;
boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();//此时是否可写
for (int i = 0; i < maxRetries && canWrite; i++) {
long tagsCode = request.getTagsCode();
if (isExtWriteEnable()) {//可以写到consumeQueueExt文件
ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
cqExtUnit.setFilterBitMap(request.getBitMap());
cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
cqExtUnit.setTagsCode(request.getTagsCode());
long extAddr = this.consumeQueueExt.put(cqExtUnit);//consumeQueueExt添加CqExtUnit记录
if (isExtAddr(extAddr)) {//返回的addr是编码过的
tagsCode = extAddr;//更新tagsCode
} else {
log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,
topic, queueId, request.getCommitLogOffset());
}
}
boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());//放置消息
if (result) {
this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());//更新StoreCheckPoint记录
return;
} else {
// XXX: warn and notify me
log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
+ " failed, retry " + i + " times");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.warn("", e);
}
}
}
// XXX: warn and notify me
log.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
/**
* 存放消息位置信息
* @param offset : CommitLogOffset
* @param size
* @param tagsCode
* @param cqOffset : ConsumeQueueOffset,下标
* @return
*
* 1.验证offset 与 maxPhysicOffset关系,如果<=代表已经存放过
* 2.构建byteBufferIndex,放置20字节信息
* 3.获取最后一个mappedFile(没有就创建)
* 4.如果是第一个创建的mappedFile,设置minLogicOffset,FlushedWhere,CommittedWhere
*/
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
if (offset <= this.maxPhysicOffset) {//不得超过物理最大偏移
return true;
}
this.byteBufferIndex.flip();
this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);//mappedFile每一个单元为20字节
//存放信息
this.byteBufferIndex.putLong(offset);
this.byteBufferIndex.putInt(size);
this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;//根据下标 获得 逻辑偏移(相对偏移)
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);//获取最后一个文件
if (mappedFile != null) {
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {
this.minLogicOffset = expectLogicOffset;//mappedFileQueue创建的第一个file,记录最小逻辑偏移
this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
this.fillPreBlank(mappedFile, expectLogicOffset);//第一个mappedFile写入前置的blank信息
log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
+ mappedFile.getWrotePosition());
}
if (cqOffset != 0) {
long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset != currentLogicOffset) {//实际该写的位置和理论不一样
LOG_ERROR.warn(
"[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset,
currentLogicOffset,
this.topic,
this.queueId,
expectLogicOffset - currentLogicOffset
);
}
}
this.maxPhysicOffset = offset;//对应commitLog的最大偏移
return mappedFile.appendMessage(this.byteBufferIndex.array());//mappedFile添加消息
}
return false;
}
/**
* 在mappedFile填充前置的blank数据, 一直到untilWhere这个位置
* @param mappedFile 此时的最后一个mappedFile
* @param untilWhere
*/
private void fillPreBlank(final MappedFile mappedFile, final long untilWhere) {
ByteBuffer byteBuffer = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
byteBuffer.putLong(0L);
byteBuffer.putInt(Integer.MAX_VALUE);
byteBuffer.putLong(0L);
//空数据标识是 (0,Integer.MAX,0)
int until = (int) (untilWhere % this.mappedFileQueue.getMappedFileSize());
for (int i = 0; i < until; i += CQ_STORE_UNIT_SIZE) {
mappedFile.appendMessage(byteBuffer.array());
}
}
/**
* 根据consumerQueue的物理下标,找到对应的mappedFile以及逻辑地址
* 获取该mappedFile逻辑地址之后的所有可读部分
*
* @param startIndex 消息单元的序号
*/
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
int mappedFileSize = this.mappedFileSize;
long offset = startIndex * CQ_STORE_UNIT_SIZE;//逻辑位置
if (offset >= this.getMinLogicOffset()) {// >= minLogicOffset
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);//找到offset所在的mappedFile
if (mappedFile != null) {
SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));//返回offset % mappedFileSize 到最大有效位置的ByteBuffer
return result;
}
}
return null;
}
//根据offset从ext中获取对应的CqExtUnit
public ConsumeQueueExt.CqExtUnit getExt(final long offset) {
if (isExtReadEnable()) {
return this.consumeQueueExt.get(offset);
}
return null;
}
//根据offset从ext中获取对应位置,获取相关信息赋值给
public boolean getExt(final long offset, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (isExtReadEnable()) {
return this.consumeQueueExt.get(offset, cqExtUnit);
}
return false;
}
public long getMinLogicOffset() {
return minLogicOffset;
}
//没用到
public void setMinLogicOffset(long minLogicOffset) {
this.minLogicOffset = minLogicOffset;
}
/**
* 获取指定消息序号所在mappedFile的下一个文件的起始消息序号(起始偏移量/20)
*/
public long rollNextFile(final long index) {
int mappedFileSize = this.mappedFileSize;
int totalUnitsInFile = mappedFileSize / CQ_STORE_UNIT_SIZE;
return index + totalUnitsInFile - index % totalUnitsInFile;
}
public String getTopic() {
return topic;
}
public int getQueueId() {
return queueId;
}
public long getMaxPhysicOffset() {
return maxPhysicOffset;
}
//没用到
public void setMaxPhysicOffset(long maxPhysicOffset) {
this.maxPhysicOffset = maxPhysicOffset;
}
public void destroy() {
this.maxPhysicOffset = -1;
this.minLogicOffset = 0;
this.mappedFileQueue.destroy();
if (isExtReadEnable()) {
this.consumeQueueExt.destroy();
}
}
public long getMessageTotalInQueue() {
return this.getMaxOffsetInQueue() - this.getMinOffsetInQueue();
}
public long getMaxOffsetInQueue() {
return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
}
//自检,如果consumeQueueExt可读,也要自检
public void checkSelf() {
mappedFileQueue.checkSelf();
if (isExtReadEnable()) {
this.consumeQueueExt.checkSelf();
}
}
//ext是否可读
protected boolean isExtReadEnable() {
return this.consumeQueueExt != null;
}
//ext是否可写
protected boolean isExtWriteEnable() {
return this.consumeQueueExt != null
&& this.defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt();
}
/**
* Check {@code tagsCode} is address of extend file or tags code.
*
*/
public boolean isExtAddr(long tagsCode) {
return ConsumeQueueExt.isExtAddr(tagsCode);
}
1.部分方法就是调用mappedFileQueue方法,以及ConsumeQueueExt的方法,如isExtAddr,checkSelf,destroy,getExt,flush,load方法不重要
2.putMessagePositionInfoWrapper, putMessagePositionInfo ,fillPreBlank用于往ConsumeQueue里写消息
3.recover用于恢复
4.getOffsetInQueueByTime,getLastOffset,getIndexBuffer,rollNextFile 用于ConsumeQueue信息的获取,查询等
5.truncateDirtyLogicFiles,deleteExpiredFile,correctMinOffset用于清除脏数据以及关键字段的维护
思考
index CQ_STORE_UNIT_SIZE offset三者之间的关系
index * CQ_STORE_UNIT_SIZE = offset
correctMinOffset为什么只处理第一个文件
因为correctMinOffset只会和第一个文件相关
问题 :
tagsCode有可能是Ext有可能不是???
这个暂时不懂,要看上层调用
getLastOffset函数中 为什么已经是最后一条记录了,还要向后遍历来确认
为什么需要函数fillPreBlank,不写前置标志位不行吗
吐槽
truncateDirtyLogicFiles 漏掉了ext的处理
里面好几个地方直接return了,漏掉了ConsumeQueueExt的处理,应该是bug.
代码太晦涩了
refer
http://blog.csdn.net/meilong_whpu/article/details/76921208
http://blog.csdn.net/chunlongyu/article/details/54576649
http://www.jianshu.com/p/453c6e7ff81c ConsumeQueue部分
http://blog.csdn.net/github_38592071/article/details/71438257?locationNum=15&fps=1#
网友评论