近期工作中使用hbase bulkload向hbase导入2TB数据(10000个hfiles),我们发现将hfiles加载到hbase 表的过程用了将近一个小时。这和我对bulk load过程的理解不太相符,在我的理解中,hbase bulkload并不会产生数据copy,数据导入通过hdfs的mv操作完成。那么,
问题1 :是什么操作消耗了一个小时呢?
另外,对bulkload一直有个疑问,
问题2 :基于某个hbase cluster中的表生成的hfiles能否导入到其他hbase cluster中的相同表(表名和列簇都相同)中,bulkload会自动处理两个集群中表的region分布差异吗?
带着上述两个问题阅读了hbase中LoadIncrementalHFiles类的代码,本文对此做个梳理。
run方法
LoadIncrementalHFiles的main函数调用run方法:
run方法run方法做了三件事:
1. 初始化
2. 判断要导入的表是否存在,不存在且参数create.table为yes, 则创建该表;不存在且create.table不为yes则抛出异常TableNotFoundException
3. 调用doBulkLoad
初始化
LoadIncrementalHFiles的初始化过程比较简单,主要是对hbase admin等对象的初始化:
Initialize方法doBulkLoad
doBulkLoad方法执行以下步骤:
1. 创建线程池
创建用于bulkload的线程池, 线程池大小由参数hbase.loadincremental.threads.max控制,默认为当前机器的core数量。源码如下:
初始化用于bulkload的线程池2. 初始化加载项队列
遍历指定目录,为每个hfile生成一个LoadQueueItem对象并添加到队列中(下文我们称此队列为LQI队列,称队列中的元素为LQI)中,该步骤由discoverLoadQueue方法完成。
discoverLoadQueue单个hfile的大小不应超过HREGION_MAX_FILESIZE, 该值由参数hbase.hregion.max.filesize控制,默认为10GB。
一个LQI代表一个加载项,LoadQueueItem类的源码如下 :
LoadQueueItem需要加载的文件在HDFS上按照column family被分配在不同的子目录下,每个子目录下的一个文件就对应一个LQI。
discoverLoadQueue方法中调用了visitBulkHFiles方法遍历hfile所在的HDFS目录,visitBulkHFiles方法对每个hfile会做一系列validation :
hfile validation过滤掉reference,link, 以'_'开头的,以及非hfile格式的文件。
3. 检查column family的有效性
在discoverLoadQueue完成对所有hfiles的遍历后,会对queue中所有的items进行column family的check,如果存在某个item的column family不属于目标表,则抛出异常:
verify column family4. 循环分组加载
While Loop to group and load hfileswhile循环的每次迭代主要执行groupOrSplit和bulkLoad两个phase的操作:
a) groupOrSplitPhase
把queue中的所有文件根据目标表的region metadata进行分组,把每个文件划分到其所属region。
groupOrSplitPhase如果某个hfile的[firstkey, lastkey]不在任何region的[starkey, endkey]范围内,则将此hfile拆分成两个文件(拆分后的文件后缀为.top和.bottom),拆分的split key就是firstkey所在region的endkey。
groupOrSplit拆分后得到的两个hfile会被封装成LQI再添加回LQI队列,这就是为什么需要一个while循环判断LQI队列是否为空。需注意,拆分后,第一个LQI肯定会在某个region范围内(除非在下次迭代加载该LQI之前目标region又发生了split),第二个LQI有可能仍需拆分。
split files added back to LQI queuegroupOrSplitPhase完成之后,所有可加载的LQI都会被放到regionGroups中。regionGroups是一个Multimap,key为region的startkey,value为对应的LQI,一个region可对应多个LQI。
b) bulkLoadPhase :
对于regionGroups中的每个key(即region的startkey),调用方法tryAtomicRegionLoad将其对应的所有LQI加载到目标table中。如果加载失败,则将failed LQI再加入到LQI队列中,供下一循环检测和加载。tryAtomicRegionLoad方法会连接hbase region server,发送SecureBulkLoadHFilesRequest请求。
groupOrSplit和bulkLoad的操作都是通过上面创建的线程池对所有hfile并发执行的。除了这两个phase的操作外,while循环中还会检测一些异常情况:
a) 对于doBulkLoad中while(!queue.isEmpty)循环,如果经过maxRetries次尝试后,LQI队列仍不为空,则抛出异常。maxRetries由参数hbase.bulkload.retries.number控制,默认为10 :
Fail after maxRetries timesb) 经过groupOrSplitPhase后,如果regionGroups中单个region单个column family对应的hfile个数超过了maxFilesPerRegionPerFamily,则抛出异常:
Fail in case too many files per region per column family to loadmaxFilesPerRegionPerFamily由参数hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily控制,默认为32。
问题1解答
通过上面的分析,我们知道bulkload过程对hfile目录进行遍历,每个hfile都会进行一系列validation,生成LQI,最终调用tryAtomicRegionLoad进行加载。我们通过打印每个步骤的耗时,发现检测hfile文件格式,即visitBulkHFiles中调用的isHFileFormat方法是主要的耗时步骤, 这是因为visitBulkHFiles方法是在主线程串行执行的,我们有10000个文件,并且每次isHFileFormat都会读取hfile的file trailer,所以累计时间很长。
我们通过添加一个hbase配置项hbase.client.bulk.load.validate.hfile.format来控制是否进行hfile格式检测,当将其设置为false时,加载2TB数据(10000个hfile)从之前的1个小时缩短为10分钟。绕过文件格式检查的前提是我们确定hfile的format都是正确的。我们还可以通过减少hfile的个数来减少bulkload在客户端运行的时间。还有一个可能的解决方案是将visitBulkHFiles方法修改成多线程执行,以后有时间可以尝试一下。
问题2解答
答案是肯定的。如上文所述,bulkload会将hfile的[firstkey, lastkey]和目标表region的[startkey, endkey]进行匹配,如果匹配失败则会进行文件拆分,所以不用担心不同集群表中region的差异。
总结
本文对hbase bulkload的客户端过程进行了分析,详述了hfile的遍历,检测,分组,拆分,加载等步骤,并对文中开头提出的两个问题进行了解答。
水平有限,若有误解,望读者指正。
网友评论