import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.{SparkConf, SparkContext}
object SparkReadHBaseTest {
// 主函数
def main(args: Array[String]) {
// 设置spark访问入口
val conf = new SparkConf().setAppName("SparkReadDwAPPTest")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setMaster("local")//调试
val sc = new SparkContext(conf)
// 获取HbaseRDD
val job = Job.getInstance(getHbaseConf())
TableSnapshotInputFormat.setInput(job, "test1_snap", new Path("/user/zhou.pengbo"))
val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hbaseRDD.map(_._2).map(getRes(_)).count()
}
def getRes(result: org.apache.hadoop.hbase.client.Result): String = {
val rowkey = Bytes.toString(result.getRow())
val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes))
println(rowkey+"---"+name)
name
}
// 构造 Hbase 配置信息
def getHbaseConf(): Configuration = {
val conf: Configuration = HBaseConfiguration.create()
conf.set(TableInputFormat.SCAN, getScanStr())
conf
}
// 获取扫描器
def getScanStr(): String = {
val scan = new Scan()
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray())
}
}
注:上述代码需将core-site.xml&hdfs-site.xml&hbase-site.xml文件放在资源目录resources下。或者在代码中进行Conf配置,如下:
package com.xcar.etl
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.{SparkConf, SparkContext}
object SparkReadHBaseTest {
val HBASE_ZOOKEEPER_QUORUM = "xxxx.com.cn"
// 主函数
def main(args: Array[String]) {
// 设置spark访问入口
val conf = new SparkConf().setAppName("SparkReadDwAPPTest")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.setMaster("local")//调试
val sc = new SparkContext(conf)
// 获取HbaseRDD
val job = Job.getInstance(getHbaseConf())
TableSnapshotInputFormat.setInput(job, "test1_snap", new Path("/user/zhou.pengbo"))
val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
hbaseRDD.map(_._2).map(getRes(_)).count()
}
def getRes(result: org.apache.hadoop.hbase.client.Result): String = {
val rowkey = Bytes.toString(result.getRow())
val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes))
println(rowkey+"---"+name)
name
}
// 构造 Hbase 配置信息
def getHbaseConf(): Configuration = {
val conf: Configuration = HBaseConfiguration.create()
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM)
conf.set("hbase.rootdir", "/apps/hbase/data")
// 设置查询的表名
conf.set(TableInputFormat.INPUT_TABLE, "test_snap")
// conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "test1_snap")
conf.set("fs.defaultFS","hdfs://xxxxxx:8020")
conf.set(TableInputFormat.SCAN, getScanStr())
conf
}
// 获取扫描器
def getScanStr(): String = {
val scan = new Scan()
val proto = ProtobufUtil.toScan(scan)
Base64.encodeBytes(proto.toByteArray())
}
}
网友评论