从hbase 0.92版本后,hbase HFile格式变为2.0,对应的写对象为HFileWriterV2,hbase写hfile是一个一个KeyValue来写的。要写KeyValue,得先初始化一个HFileWriterV2对象。在构建一个HFileWriterV2对象时,会初始化写HFileBlock的对象HFileBlock.Writer,写dataBlockIndex文件的对象HFileBlockIndex.BlockIndexWriter(dataBlockIndexWriter),写元数据块索引文件的对象HFileBlockIndex.BlockIndexWriter(metaBlockIndexWriter),这个和dataBlockIndex是同一个类来实现。初始化hdfs的输出流,以及通过向namenode请求一个新的块,和块写的目的datanode
Block Index(leaf-level-index)
Block Index 是记录每个block的第一个key,偏移量,和大小。即指向block。
Intermediate Index
intermediate index 记录每个Block Index 块的第一个key,偏移量,和大小。即指向Block Index块,如果root index过大,则会有多层intermediate index。
Root Index
root index 记录每个intermediate index块的第一个key,偏移量,和大小。即指向 intermediate index 块。
写HFile的条件
触发HBase memstore flush,当memstore 大小达到设定值时,就触发memstore flush,在memstore flush时会把一个一个的KeyValue写磁盘(HDFS),由Store的internalFlushCache方法完成,这里贴下关键代码:
try {
int compactionKVMax = conf.getInt(HConstants.COMPACTION_KV_MAX, 10);
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
// if we fail.
synchronized (flushLock) {
status.setStatus("Flushing " + this + ": creating writer");
// A. Write the map out to the disk
//这里的writer实例是 StoreFile.Writer对象,writer里引用了一个HFile.Writer writer实例,
//建立hdfs的输出流管道也在这里。
writer = createWriterInTmp(set.size());
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
pathName = writer.getPath();
try {
List<KeyValue> kvs = new ArrayList<KeyValue>();
boolean hasMore;
do {
//next一次,取该行所有的KV
hasMore = scanner.next(kvs, compactionKVMax);
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us
// set its memstoreTS to 0\. This will help us save space when writing to disk.
if (kv.getMemstoreTS() <= smallestReadPoint) {
// let us not change the original KV. It could be in the memstore
// changing its memstoreTS could affect other threads/scanners.
kv = kv.shallowCopy();
kv.setMemstoreTS(0);
}
//以KV为单位,写
writer.append(kv);
flushed += this.memstore.heapSizeChange(kv, true);
}
kvs.clear();
}
} while (hasMore);
} finally {
// Write out the log sequence number that corresponds to this output
// HFile. The hfile is current up to and including logCacheFlushId.
status.setStatus("Flushing " + this + ": appending metadata");
writer.appendMetadata(logCacheFlushId, false);
status.setStatus("Flushing " + this + ": closing flushed file");
//注意这里,写root-leave-index,intermediate-index在这里完成。
writer.close();
}
}
} finally {
flushedSize.set(flushed);
scanner.close();
}
单个KeyValue写
append kv是有storeFile的append方法:代码如下:
public void append(final KeyValue kv) throws IOException {
//1 写bloomKey到缓冲区generalBloomFilterWriter
appendGeneralBloomfilter(kv);
//如果该KeyValue标记为待删除的KeyValue,则写到deleteFamilyBloomFilterWriter
appendDeleteFamilyBloomFilter(kv);
//这里是重点,HFile data block,leaf-level-index,data-encode,compress,checksum等工作都在这里完成
//也是分析的重点。这里的writer是HFile.Writer的一个实例,HFile.Writer是一个接口,他下面有两个具体的实
//现,对应HFile的两个版本,我们这里是版本2,所以是HFileWriterV2的一个实例。
writer.append(kv);
trackTimestamps(kv);
}
HFileWriterV2 的 writer的append(KeyValue kv)方法直接调用下面的方法。
private void append(final long memstoreTS, final byte[] key, final int koffset, final int klength,
final byte[] value, final int voffset, final int vlength)
throws IOException {
//检查该key是否重复,以及是否比上一个写入的KV的key大,如果比上一个小,则不合逻辑,因为hbase是增序的
boolean dupKey = checkKey(key, koffset, klength);
//检查value是否为null。
checkValue(value, voffset, vlength);
if (!dupKey) {
//这里很关键,flush 数据,leaf-leavl-index都在这里完成
//如果和上一个key不相同,则说明是新的一行,否则是同一行的不同KV,则不会判断该block是否写满
checkBlockBoundary();
}
//fsBlockWriter 的state状态默认为INIT,所以在第一次写时,需要新建一个block
//新的块会把缓冲区baosInMemory reset,写block的header到缓冲区。注意该baosInMemory是个字节输出流,
//每次新写一个block时,该baosInMemory是重用的。
if (!fsBlockWriter.isWriting())
newBlock();
//接下来就是写数据了。
// Write length of key and value and then actual key and value bytes.
// Additionally, we may also write down the memstoreTS.
{
//这里写其实都是写到一个缓冲字节数组:ByteArrayOutputStream
DataOutputStream out = fsBlockWriter.getUserDataStream();
out.writeInt(klength);
totalKeyLength += klength;
out.writeInt(vlength);
totalValueLength += vlength;
out.write(key, koffset, klength);
out.write(value, voffset, vlength);
if (this.includeMemstoreTS) {
WritableUtils.writeVLong(out, memstoreTS);
}
}
// Are we the first key in this block? 记录该block的第一个key,
//用来写leaf-leavl-index。
if (firstKeyInBlock == null) {
// Copy the key.
firstKeyInBlock = new byte[klength];
System.arraycopy(key, koffset, firstKeyInBlock, 0, klength);
}
//记录该block的最后一个key,
lastKeyBuffer = key;
lastKeyOffset = koffset;
lastKeyLength = klength;
entryCount++;
}
flush block 刷新data block
DataBlock 默认大小为128个字节,所以当block 的大小大于或者等于128个字节时,开始flush block,
我们来看checkBlockBoundary()的实现:
private void checkBlockBoundary() throws IOException {
//如果block大小没有达到临界值,则继续写,否则开始flush block
if (fsBlockWriter.blockSizeWritten() < blockSize)
return;
//写block(报过data block encode,compress,checksum),leaf-leavl-index
finishBlock();
//检查是否索引缓存区是否写满,即大小达到临界值,
writeInlineBlocks(false);
//新创建一个快。
newBlock();
}
finishBlock
finishBlock 主要干如下几件事:
写data block
1 记录该块的偏移值。如果是第一个block,则为0。
2 记录上一个block的偏移值lastDataBlockOffset。(为写leaf-leavl-index做准备)
3 通过fsBlockWriter把block数据写到hdfs,如果block设置了dataEncode,则会Encode,如果设置压缩比如LZO.则 会对block的数据压缩。在ENCODE,Compression前,会把writer的state设置为BLOCK_READY,以让后面KeyValue继续写。encode时分为cache encode和disk encode,这里只做disk encode,如果在建family时设置了data encode,则encode,否则不会encode。
4 如果给table配置了压缩算法,checksum 是和data存在一起的即一个字节缓冲区,一起写到hdfs,如果没有配置压缩算法,则生成的checksum是单独一个字节缓冲区,而且单独写到hdfs,
hbase checksum 还是单独再写篇文章。
二 记录该块的索引到dataBlockIndexWriter,值为该块的第一个key:firstKeyInBlock,该block在HFile的偏移量:lastDataBlockOffset,该block的大小。同时block的个数会加1.
三 如果需要缓冲写,则把该block添加到cache,默认不缓冲写的block。
一个data block 写完,完整的一个block有下面8部分组成(HFileV2格式):
blockType+Compressed block size+Uncompressed block size+offset of the previous block+bytesPerChecksum+data size(报过checksum)+checksums+data(数据)
blockType 为DATA,如果是索引块,则为LEAF-INDEX或者INTERMEDIATE_INDEX,ROOT_INDEX.
Compressed block size 压缩部分数据的大小
Uncompressed block size未压缩数据的大小
offset of the previous block 上一个快递的偏移量。
bytesPerChecksum是checksums的索引。
data size block在磁盘上的大小包过checksum。
checksums 校验(如果没有压缩,这部分不在block)
data 实际的数据。
flush Block Index 即leaf-level-index
从上面看到,写完一个data block,则会为该block添加一条索引到dataBlockIndexWriter。如果写了n个data block,则leaf-level-index会有n条记录,记录每个block的第一个key,在hfile中的偏移量,以及大小。
如果leaf-level-index缓冲区的大小达到默认的最大值(128K),则需要flush,这里不强制flush。
一个block 写完后,会创建一个新的block。
KeyValue写完(即memstore的所有KeyValue写到缓冲区后的操作)
1 把缓冲区中没有写到hdfs的写到hdfs。
2 写block index ,这里是强制flush,即使block index大小没有达到默认值,也会刷新。前面说到在每写个block index块时会添加每个block index的一个key到root index缓冲区。
3 写metadata blocks如果有的话,metadata blocks即是bloom filter。如果有metadata blocks,同时也会记录每个metadata blocks的索引到metaBlockIndexWriter,这个索引在写完root index即(data block index)后,才开始写。
4 写root index 这一步其实除了写root index外,还会写intermediate index,如果root index的记录过多,即大于默认的索引块maxChunkSize大小(128K),则会把root-index分成root-index size /maxChunkSize 加1个intermediate index索引块,每写一个intermediate index块,就添加条记录到root index缓冲区,如果root index还是过大即大于128K,则再分,这时index tree 的层级也加1.直到root index 缓冲区大小小于128K。
5 写Meta block index索引块。Meta block index 索引是单级别的索引,不像root index 多级。
6 最后写fileinfo。fileInfo的数据项就不详细描述了。
需要注意的是只要data block在配置了encode情况下encode,其他的块都不进行encode。比如index block都不encode。
从4-6是在HFile一打开就加载的到内存的。
从上面的data block index 分析来看,我们在scan的时候,先打开HFile,加载root index,通过root index查
intermediate index,再根据intermediate index查block index 找到row 所在具体的一个data block。需要查找两次索引。root index和intermediate index一起加载。index block和bloom block 会缓冲。
这里主要说明了hbase memstore 写HFile data block, index, 的过程,重点说明了HBase HFile 多级 index的关系。通过这篇文章
应该清楚了HBase HFile v2 的一个大体结构,以及索引的工作原理。
网友评论