HBase-Spark-Snapshot-Read-Demo

作者: 步闲 | 来源:发表于2019-12-26 10:37 被阅读0次
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase._
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.hbase.client.Scan
    import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    import org.apache.hadoop.hbase.util.{Base64, Bytes}
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    object SparkReadHBaseTest {
    
      //   主函数
      def main(args: Array[String]) {
    
        // 设置spark访问入口
        val conf = new SparkConf().setAppName("SparkReadDwAPPTest")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .setMaster("local")//调试
        val sc = new SparkContext(conf)
        // 获取HbaseRDD
        val job = Job.getInstance(getHbaseConf())
        TableSnapshotInputFormat.setInput(job, "test1_snap", new Path("/user/zhou.pengbo"))
    
        val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])
        hbaseRDD.map(_._2).map(getRes(_)).count()
      }
    
    
      def getRes(result: org.apache.hadoop.hbase.client.Result): String = {
        val rowkey = Bytes.toString(result.getRow())
        val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes))
        println(rowkey+"---"+name)
        name
      }
      // 构造 Hbase 配置信息
      def getHbaseConf(): Configuration = {
        val conf: Configuration = HBaseConfiguration.create()
        conf.set(TableInputFormat.SCAN, getScanStr())
        conf
      }
    
      // 获取扫描器
      def getScanStr(): String = {
        val scan = new Scan()
        val proto = ProtobufUtil.toScan(scan)
        Base64.encodeBytes(proto.toByteArray())
      }
    }
    

    注:上述代码需将core-site.xml&hdfs-site.xml&hbase-site.xml文件放在资源目录resources下。或者在代码中进行Conf配置,如下:

    package com.xcar.etl
    
    
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase._
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.hbase.client.Scan
    import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    import org.apache.hadoop.hbase.util.{Base64, Bytes}
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    object SparkReadHBaseTest {
    
      val HBASE_ZOOKEEPER_QUORUM = "xxxx.com.cn"
    
      //   主函数
      def main(args: Array[String]) {
    
        // 设置spark访问入口
        val conf = new SparkConf().setAppName("SparkReadDwAPPTest")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .setMaster("local")//调试
        val sc = new SparkContext(conf)
        // 获取HbaseRDD
        val job = Job.getInstance(getHbaseConf())
        TableSnapshotInputFormat.setInput(job, "test1_snap", new Path("/user/zhou.pengbo"))
    
        val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])
        hbaseRDD.map(_._2).map(getRes(_)).count()
      }
    
      def getRes(result: org.apache.hadoop.hbase.client.Result): String = {
        val rowkey = Bytes.toString(result.getRow())
        val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes))
        println(rowkey+"---"+name)
        name
      }
    
      // 构造 Hbase 配置信息
      def getHbaseConf(): Configuration = {
        val conf: Configuration = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.property.clientPort", "2181")
        conf.set("zookeeper.znode.parent", "/hbase-unsecure")
        conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM)
        conf.set("hbase.rootdir", "/apps/hbase/data")
        // 设置查询的表名
        conf.set(TableInputFormat.INPUT_TABLE, "test_snap")
        // conf.set("hbase.TableSnapshotInputFormat.snapshot.name", "test1_snap")
        conf.set("fs.defaultFS","hdfs://xxxxxx:8020") 
        conf.set(TableInputFormat.SCAN, getScanStr())
        conf
      }
    
      // 获取扫描器
      def getScanStr(): String = {
        val scan = new Scan()
        val proto = ProtobufUtil.toScan(scan)
        Base64.encodeBytes(proto.toByteArray())
      }
    }
    

    相关文章

      网友评论

        本文标题:HBase-Spark-Snapshot-Read-Demo

        本文链接:https://www.haomeiwen.com/subject/uqjkoctx.html