Index索引文件概览
消息消费队列是RocetMQ专门为消息订阅构建的索引服务,提高主题与消息队列检索消息的速度。IndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:$HOME/store/index/{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。
Index索引文件结构:
image.png
IndexFile异步构造
构建consumequeue、indexFile索引文件,通过一个ReputMessageService异步线程进行处理,构建consumequeue、indexFile索引文件的数据从commitLog的MappedFile中的ByteBuffer中获取,一条消息消息构造一个构建索引服务的DispatchRequest请求,再由ConsumeQueue服务处理DispatchRequest请求构建consumequeue的mappedFile文件。由IndexService处理请求构建indexFile索引文件,然后将各自的文件进行刷盘。
IndexFile
IndexFile对象的主要成员属性,其中包含MappedFile对象,这个和ConsumeQueue中利用的MappedFile是一样的作用,用来做磁盘IO,将内存映射消息写入磁盘。
public class IndexFile {
// hash槽的大小
private static int hashSlotSize = 4;
// 索引大小
private static int indexSize = 20;
private static int invalidIndex = 0;
// 索引槽的数量 500万
private final int hashSlotNum;
// 索引的数量 2千万
private final int indexNum;
// 索引的MappedFile
private final MappedFile mappedFile;
private final FileChannel fileChannel;
//MappedFile中的直接内存,用来存放hash索引用
private final MappedByteBuffer mappedByteBuffer;
// 索引头
private final IndexHeader indexHeader;
}
IndexHeader
每一个IndexFile都包含IndexHeader,是这个IndexFile的汇总信息。
// 存放IndexHeader的buffer
private final ByteBuffer byteBuffer;
// indexFile存放消息的存储时间的 开始时间
private AtomicLong beginTimestamp = new AtomicLong(0);
// indexFile存放消息的存储时间的 结束时间
private AtomicLong endTimestamp = new AtomicLong(0);
// indexFile 开始commitLog 文件的offset
private AtomicLong beginPhyOffset = new AtomicLong(0);
// IndexFile 结束commitLog 文件的offset
private AtomicLong endPhyOffset = new AtomicLong(0);
// hash槽的Count 500万
private AtomicInteger hashSlotCount = new AtomicInteger(0);
// hash索引的条目2千万条 500 * 40
private AtomicInteger indexCount = new AtomicInteger(1);
IndexService
IndexService 是构建Index索引文件的服务。一条消息可能有多个key,这样这条消息就会有多个索引条目。
根据一条消息的请求构建Index索引,
/**
* 根据一条消息的请求构建Index索引,并放入
* @param req
*/
public void buildIndex(DispatchRequest req) {
//获取需要写入的IndexFile
IndexFile indexFile = retryGetAndCreateIndexFile();
if (indexFile != null) {
// 文件结尾offset
long endPhyOffset = indexFile.getEndPhyOffset();
DispatchRequest msg = req;
String topic = msg.getTopic();
String keys = msg.getKeys();
if (msg.getCommitLogOffset() < endPhyOffset) {
return;
}
// 事务消息
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
switch (tranType) {
case MessageSysFlag.TRANSACTION_NOT_TYPE:
case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
break;
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
return;
}
// 根据uniqKey来构建文件
if (req.getUniqKey() != null) {
indexFile = putKey(indexFile, msg, buildKey(topic, req.getUniqKey()));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
// 根据keys中的每个key来构建indexFile
if (keys != null && keys.length() > 0) {
String[] keyset = keys.split(MessageConst.KEY_SEPARATOR);
for (int i = 0; i < keyset.length; i++) {
String key = keyset[i];
if (key.length() > 0) {
indexFile = putKey(indexFile, msg, buildKey(topic, key));
if (indexFile == null) {
log.error("putKey error commitlog {} uniqkey {}", req.getCommitLogOffset(), req.getUniqKey());
return;
}
}
}
}
} else {
log.error("build index error, stop building index");
}
}
/**
* 放置key到indexFile中
* @param indexFile
* @param msg 消息请求
* @param idxKey topic#uniqKey
* @return
*/
private IndexFile putKey(IndexFile indexFile, DispatchRequest msg, String idxKey) {
// for循环一直方法详细,直到出现一个错误
for (boolean ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp()); !ok; ) {
log.warn("Index file [" + indexFile.getFileName() + "] is full, trying to create another one");
//再次获取或创建IndexFile
indexFile = retryGetAndCreateIndexFile();
if (null == indexFile) {
return null;
}
//存放消息
ok = indexFile.putKey(idxKey, msg.getCommitLogOffset(), msg.getStoreTimestamp());
}
return indexFile;
}
IndexFile存放这个消息的索引方法,计算key的hash值,确定hash槽,将索引信息存入MappedByteBuffer等待刷盘操作。
/**
* 存放索引信息到mappedByteBuffer等待刷盘
* @param key 存放的key
* @param phyOffset 存放的物理offset
* @param storeTimestamp 存放消息存储的时间
* @return
*/
public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {
if (this.indexHeader.getIndexCount() < this.indexNum) {
// key的hash值
int keyHash = indexKeyHashMethod(key);
// 所在hash槽的位置
int slotPos = keyHash % this.hashSlotNum;
// 40byte hashIndex + 槽的位置 * 槽的大小
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
// fileLock = this.fileChannel.lock(absSlotPos, hashSlotSize,
// false);
// 获取mappedByteBuffer写入的位置,上一个hash索引存储的位置
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {
slotValue = invalidIndex;
}
// 时间差值,根据时间进行消息查找
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
// 到秒
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) {
timeDiff = 0;
} else if (timeDiff > Integer.MAX_VALUE) {
timeDiff = Integer.MAX_VALUE;
} else if (timeDiff < 0) {
timeDiff = 0;
}
// 存储的绝对位置
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ this.indexHeader.getIndexCount() * indexSize;
// 存放hashKey,absIndexPos这个位置放入keyHash
this.mappedByteBuffer.putInt(absIndexPos, keyHash);
// 存放物理commitlog消息的物理offset,absIndexPos + 4(上一个keyhash占用了4个位置)这个位置放入phyOffset
this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);
// 存放时间差值
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);
// 存放上一个索引的位置
this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);
// 更新存放的索引数量,absSlotPos为索引槽的位置
this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
//更新文件索引的头信息,hash槽的总数、index条目的总数、最后消息的物理偏移量、最后消息的存储时间
if (this.indexHeader.getIndexCount() <= 1) {
this.indexHeader.setBeginPhyOffset(phyOffset);
this.indexHeader.setBeginTimestamp(storeTimestamp);
}
if (invalidIndex == slotValue) {
this.indexHeader.incHashSlotCount();
}
this.indexHeader.incIndexCount();
this.indexHeader.setEndPhyOffset(phyOffset);
this.indexHeader.setEndTimestamp(storeTimestamp);
return true;
} catch (Exception e) {
log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
}
} else {
log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()
+ "; index max num = " + this.indexNum);
}
return false;
}
消息查询
QueryMessageProcessor接受客户端查询请求,进行处理。然后到DefaultMessageStore#queryMessage()方法,然后到IndexService#queryOffset(),最后到IndexFile#selectPhyOffset()方法。
// 根据 topic、key 查找到 indexFile 索引文件中的一条记录,根据其中的commitLog offset从CommitLog文件中读取消息的实体内容。
final QueryMessageResult queryMessageResult =
this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),
requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),
requestHeader.getEndTimestamp());
// 根据 key 查询消息
@Override
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
QueryMessageResult queryMessageResult = new QueryMessageResult();
long lastQueryMsgTime = end;
for (int i = 0; i < 3; i++) {
QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);
if (queryOffsetResult.getPhyOffsets().isEmpty()) {
break;
}
Collections.sort(queryOffsetResult.getPhyOffsets());
queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());
for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {
long offset = queryOffsetResult.getPhyOffsets().get(m);
try {
boolean match = true;
MessageExt msg = this.lookMessageByOffset(offset);
if (0 == m) {
lastQueryMsgTime = msg.getStoreTimestamp();
}
// String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR);
// if (topic.equals(msg.getTopic())) {
// for (String k : keyArray) {
// if (k.equals(key)) {
// match = true;
// break;
// }
// }
// }
if (match) {
SelectMappedBufferResult result = this.commitLog.getData(offset, false);
if (result != null) {
int size = result.getByteBuffer().getInt(0);
result.getByteBuffer().limit(size);
result.setSize(size);
queryMessageResult.addMessage(result);
}
} else {
log.warn("queryMessage hash duplicate, {} {}", topic, key);
}
} catch (Exception e) {
log.error("queryMessage exception", e);
}
}
if (queryMessageResult.getBufferTotalSize() > 0) {
break;
}
if (lastQueryMsgTime < begin) {
break;
}
}
return queryMessageResult;
}
/**
* 根据Index消息查询服务
* @param topic
* @param key
* @param maxNum
* @param begin 消息存储的开始时间,和IndexFile 的IndexHeader中的存储消息的开始时间进行对比
* @param end 消息存储的结束时间,和IndexFile 的IndexHeader中的存储消息的结束时间进行对比
* @return
*/
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {
List<Long> phyOffsets = new ArrayList<Long>(maxNum);
long indexLastUpdateTimestamp = 0;
long indexLastUpdatePhyoffset = 0;
maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());
try {
this.readWriteLock.readLock().lock();
if (!this.indexFileList.isEmpty()) {
for (int i = this.indexFileList.size(); i > 0; i--) {
IndexFile f = this.indexFileList.get(i - 1);
boolean lastFile = i == this.indexFileList.size();
if (lastFile) {
indexLastUpdateTimestamp = f.getEndTimestamp();
indexLastUpdatePhyoffset = f.getEndPhyOffset();
}
if (f.isTimeMatched(begin, end)) {
f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);
}
if (f.getBeginTimestamp() < begin) {
break;
}
if (phyOffsets.size() >= maxNum) {
break;
}
}
}
} catch (Exception e) {
log.error("queryMsg exception", e);
} finally {
this.readWriteLock.readLock().unlock();
}
return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);
}
/**
* 根据索引key查找消息
* @param phyOffsets 查找到的消息物理偏移量
* @param key 索引key
* @param maxNum 本次查找最大消息条数
* @param begin 开始时间戳
* @param end 结束时间戳
* @param lock
*/
public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum,
final long begin, final long end, boolean lock) {
if (this.mappedFile.hold()) {
// 计算key的hashcode
int keyHash = indexKeyHashMethod(key);
// 定位到hash槽的位置
int slotPos = keyHash % this.hashSlotNum;
// 计算hash槽的绝对物理位置
int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try {
if (lock) {
// fileLock = this.fileChannel.lock(absSlotPos,
// hashSlotSize, true);
}
// 得到槽的值
int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
// if (fileLock != null) {
// fileLock.release();
// fileLock = null;
// }
// 如果对应的Hash槽中存储的数据小于1或者大于当前索引条目个数则表示给HashCode没有对应的条条目,直接返回
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()
|| this.indexHeader.getIndexCount() <= 1) {
} else {
// 由于会存在hash冲突,根据slotValue定位该hash槽最新的一个Item条目,将存储的物理偏移加入到phyOffsets中,
// 然后继续验证Item条目中存储的上一个Index下标,如果大于等于1并且小于最大条目数,则继续查找,否则结束查找。
for (int nextIndexToRead = slotValue; ; ) {
// 如果大于查找数量,中断
if (phyOffsets.size() >= maxNum) {
break;
}
// 根据Index下标定位到条目的起始物理偏移量,然后依次读取hashCode、物理偏移量、时间差、上一个条目的Index下标
int absIndexPos =
IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize
+ nextIndexToRead * indexSize;
int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos);
long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8);
int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
// 如果存储的时间差小于0,则直接结束;
// 如果hashcode匹配并且消息存储时间介于待查找时间start、end之间则将消息物理偏移量加入到phyOffsets,并验证条目
// 前一个Index索引,如果大于等于1并且小于Index条目数,则继续查找,否则结束整个查找。
if (timeDiff < 0) {
break;
}
timeDiff *= 1000L;
long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff;
boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
if (keyHash == keyHashRead && timeMatched) {
phyOffsets.add(phyOffsetRead);
}
if (prevIndexRead <= invalidIndex
|| prevIndexRead > this.indexHeader.getIndexCount()
|| prevIndexRead == nextIndexToRead || timeRead < begin) {
break;
}
nextIndexToRead = prevIndexRead;
}
}
} catch (Exception e) {
log.error("selectPhyOffset exception ", e);
} finally {
if (fileLock != null) {
try {
fileLock.release();
} catch (IOException e) {
log.error("Failed to release the lock", e);
}
}
this.mappedFile.release();
}
}
}
网友评论