一、背景
项目中有需求,要频繁地、快速地向一个表中初始化数据。因此如何加载数据,如何提高速度是需要解决的问题。
一般来说,作为数据存储系统会分为检索和存储两部分。检索是对外暴露数据查询接口。存储一是要实现数据按固定规则存储到存储介质中(如磁盘、内存等),另一方面还需要向外暴露批量装载的工具。如DB2的 db2load 工具,在关闭掉日志的前提下,写入速度能有显著提高。
二、知识点
- Hbase 2.0 的LoadIncrementalHFiles 支持向Hbase 写入HFile 文件
- 写入的HFile 文件要求是排序的(rowKey,列簇,列)
- 关键是绕过Hbase regionServer,直接写入Hbase文件
- Spark RDD的repartitionAndSortWithinPartitions 方法可以高效地实现分区并排序
- JAVA util.TreeMap 是红黑树的实现,能很好的实现排序的要求
三、流程
- 对待写入的数据按Key值构造util.TreeMap 树结构。目的是按Key值构造匹配Hbase 的排序结构
- 转换成RDD,使用repartitionAndSortWithinPartitions算子 对Key值分区并排序
- 调用RDD的saveAsNewAPIHadoopFile 算子,生成HFile文件
- 调用Hbase: LoadIncrementalHFiles 将HFile文件Load 到Hbase 表中
四、实现
结构
ByteArrayWrapper.scala-> 对RowKey 结构的封装,支持排序
FamiliesQualifiersValues.scala -> 使用treeMap,实现排序结构
HBaseContext.scala ->
-saveAsHFile -> 写HFile文件
- bulkLoadThinRows -> 写入Hbase
代码参考:[HbaseETL] (https://github.com/Smallhi/HbaseETL)
例子
def initDate() = {
// 清空,并重新创建表
createTable
// 准备数据,rdd 处理
import spark.implicits._
val rdd = spark.sql("select * from hive.graph").map(x => {
val sid = x.getString(0)
val id = x.getString(1)
val idType = x.getString(3)
(sid, id, idType)
}).rdd
// bulk load
hc.bulkLoadThinRows[(String, String, String)](rdd,
"lenovo:GRAPH",
t => {
val rowKey = rowKeyByMD5(t._2, t._3)
val familyQualifiersValues = new FamiliesQualifiersValues
val pk = t._2 + "|" + t._3
// Hbase 存入两列,一列 PK 存 业务主键,一列 s 存 superid
val column = List(("pk", pk), ("s", t._1))
column.foreach(f => {
val family: Array[Byte] = Bytes.toBytes(columnFamily.head)
val qualifier = Bytes.toBytes(f._1)
val value: Array[Byte] = Bytes.toBytes(f._2)
familyQualifiersValues += (family, qualifier, value)
})
(new ByteArrayWrapper(rowKey), familyQualifiersValues)
},
10
)
}
网友评论