spark环境下使用bluk load方式推hbase
背景介绍
Hbase作为一种常用的数据存储工具,对应解决大数据实时查询问题,有良好的解决方案,但是在使用用过程中发现在短时间内推入大规模数据存在性能瓶颈,使用API调研方式并不利于周期性(如每天)推入大规模数据的使用。因此调研了使用bulk load的方式完成大规模数据写入。
基本数据量
每天推入数据约在几十亿条的规模,每个数据拥有8个列。最后结果验证了此规模数据使用bulk load方式推入约使用2个小时左右
bulk load优势说明
写入hbase的方式有以下几种:
- 调用Hbase API,使用Table.put方式单条写入
- MapReduce方式,使用TableOutputFormat作为输出
- Bulk Load方式,先将要推入的数据按照格式持久化成HFile文件,然后使用Hbase对该文件进行load
在spark环境下使用bulk load方式写入hbase有以下几点优势:
- spark环境能给无缝的与hive、hdfs等工具结合使用,方便数据加工与HFile文件的格式化
- spark是集群环境可根据数据量的规模自行扩展资源,加快文件格式化
- bulk load方式避免了程序频繁读写API,没有接口频繁读写的消耗。减轻Hbase侧的服务压力
基本步骤
image.png- 从hive读取数据
- 利用spark集群对数据进行加工,并定义rowKey,并对数据进行排序
- 对数据排序后,利用ImmutableBytesWritable组长数据
- 利用HFileOutputFormat2将数据以HFile文件的方式写到HDFS目录上
- 利用LoadIncrementalHFiles工具将HFile 加载到hbase上
代码实现
// 创建sparkSession
val builder = SparkSession.builder()
val spark = builder.appName(s"${this.getClass.getSimpleName}")
.enableHiveSupport()
.getOrCreate()
// 要写入文件的路径
val orcFilePath = args(0)
// 准备数据,将表中数据转换成hbase可以使用数据
val dataRdd = spark.read.format("orc").load(args(0))
// columnFamily
val columnFamily = args(1)
val rdd = dataRdd.rdd.sortBy(line => line.toString()).map(line => {
val rowkey = line.toString()
val kv : KeyValue = new KeyValue(Bytes.toBytes(rowkey), columnFamily.getBytes(), "column1".getBytes(), rowkey.getBytes() )
(new ImmutableBytesWritable(Bytes.toBytes(rowkey)), kv)
})
// 设置hbase配置
val tableName = args(2) // tableName
val erp = args(3)
val instanceName = args(4)
val assessKey = args(5)
val hbaseConfig = HBaseConfiguration.create
hbaseConfig.set("bdp.hbase.erp", erp)
hbaseConfig.set("bdp.hbase.instance.name", instanceName)
hbaseConfig.set("bdp.hbase.accesskey", assessKey)
//
val job = Job.getInstance(hbaseConfig)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
// 获取hbase连接
val connection = ConnectionFactory.createConnection(hbaseConfig)
val admin = connection.getAdmin
val hbTableName =TableName.valueOf(tableName)
// 获取指定表
val realTable = connection.getTable(hbTableName)
// 获取hbase的region
val regionLocator = connection.getRegionLocator(hbTableName)
// 配置HFile输出
HFileOutputFormat2.configureIncrementalLoad (job, realTable, regionLocator)
// 将数据保存成hfile 到指定目录
val timeLong = System.currentTimeMillis()
val hFilePath = s"hdfs://172.21.2.110/user/mart/dev.db/han/hbase/$timeLong"
val networkPath = s"hdfs://172.21.2.110/user/mart/dev.db/han/hbase/$timeLong"
try {
rdd.saveAsNewAPIHadoopFile(hFilePath, classOf[ImmutableBytesWritable],
classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)
val bulkloader = new LoadIncrementalHFiles(hbaseConfig)
bulkloader.doBulkLoad(new Path(networkPath), admin, realTable, regionLocator)
val hdfs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
if (hdfs.exists(new Path(hFilePath))) {
hdfs.delete(new Path(hFilePath), true)
}
} catch {
case e: Exception =>{
val error = e.getCause
println(s"写hbase失败 $error")
}
} finally {
val hdfs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
if (hdfs.exists(new Path(hFilePath))) {
hdfs.delete(new Path(hFilePath), true)
}
}
}
网友评论