美文网首页HBASE 知识整理
HBase实操:Spark-Read-HBase-Snapsho

HBase实操:Spark-Read-HBase-Snapsho

作者: 步闲 | 来源:发表于2020-04-17 06:54 被阅读0次

    前言:之前给大家分享了Spark通过接口直接读取HBase的一个小demo:HBase-Spark-Read-Demo,但如果在数据量非常大的情况下,Spark直接扫描HBase表必然会对HBase集群造成不小的压力。基于此,今天再给大家分享一下Spark通过Snapshot直接读取HBase HFile文件的方式。

    首先我们先创建一个HBase表:test,并插入几条数据,如下:

    hbase(main):003:0> scan 'test'
    ROW                                              COLUMN+CELL                                                                                                                                                                                                                       
     r1                                              column=f:name, timestamp=1583318512414, value=zpb                                                                                               
     r2                                              column=f:name, timestamp=1583318517079, value=lisi                                                                                               
     r3                                              column=f:name, timestamp=1583318520839, value=wang                                                                                               
    

    接着,我们创建该HBase表的快照,其在HDFS上路径如下:

    hbase(main):005:0> snapshot 'test', 'test-snapshot'
    0 row(s) in 0.3690 seconds
    
    $ hdfs dfs -ls /apps/hbase/data/.hbase-snapshot
    Found 1 items
    drwxr-xr-x   - hbase hdfs          0 2020-03-21 21:24 /apps/hbase/data/.hbase-snapshot/test-snapshot
    

    代码如下:

    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase._
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.hbase.client.Scan
    import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    import org.apache.hadoop.hbase.util.{Base64, Bytes}
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    object SparkReadHBaseSnapshotDemo {
    
      //   主函数
      def main(args: Array[String]) {
    
        // 设置spark访问入口
        val conf = new SparkConf().setAppName("SparkReadHBaseSnapshotDemo")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .setMaster("local")//调试
    
        val sc = new SparkContext(conf)
        // 获取HbaseRDD
        val job = Job.getInstance(getHbaseConf())
        TableSnapshotInputFormat.setInput(job, "test-snapshot", new Path("/user/tmp"))
    
        val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])
          hbaseRDD.map(_._2).map(getRes(_)).count()
      }
    
    
      def getRes(result: org.apache.hadoop.hbase.client.Result): String = {
        val rowkey = Bytes.toString(result.getRow())
        val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes))
        println(rowkey+"---"+name)
        name
      }
      // 构造 Hbase 配置信息
      def getHbaseConf(): Configuration = {
        val conf: Configuration = HBaseConfiguration.create()
        conf.set(TableInputFormat.SCAN, getScanStr())
        conf
      }
    
      // 获取扫描器
      def getScanStr(): String = {
        val scan = new Scan()
        // scan.set....  各种过滤
        val proto = ProtobufUtil.toScan(scan)
        Base64.encodeBytes(proto.toByteArray())
      }
    }
    

    注:上述代码需将core-site.xml&hdfs-site.xml&hbase-site.xml文件放在资源目录resources下。否则,应在代码中进行配置,代码如下:

    package com.xcar.etl
    
    import org.apache.hadoop.fs.Path
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase._
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.hbase.client.Scan
    import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
    import org.apache.hadoop.hbase.protobuf.ProtobufUtil
    import org.apache.hadoop.hbase.util.{Base64, Bytes}
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    object SparkReadHBaseSnapshotDemo2 {
    
      val HBASE_ZOOKEEPER_QUORUM = "xxxx.com.cn"
    
      //   主函数
      def main(args: Array[String]) {
    
        // 设置spark访问入口
        val conf = new SparkConf().setAppName("SparkReadHBaseSnapshotDemo2")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
          .setMaster("local")//调试
    
        val sc = new SparkContext(conf)
        // 获取HbaseRDD
        val job = Job.getInstance(getHbaseConf())
        TableSnapshotInputFormat.setInput(job, "test-snapshot", new Path("/user/tmp"))
    
        val hbaseRDD = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat],
          classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
          classOf[org.apache.hadoop.hbase.client.Result])
        hbaseRDD.map(_._2).map(getRes(_)).count()
      }
    
      def getRes(result: org.apache.hadoop.hbase.client.Result): String = {
        val rowkey = Bytes.toString(result.getRow())
        val name = Bytes.toString(result.getValue("f".getBytes, "name".getBytes))
        println(rowkey+"---"+name)
        name
      }
    
      // 构造 Hbase 配置信息
      def getHbaseConf(): Configuration = {
        val conf: Configuration = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.property.clientPort", "2181")
        conf.set("zookeeper.znode.parent", "/hbase")
        conf.set("hbase.zookeeper.quorum", HBASE_ZOOKEEPER_QUORUM)
        conf.set("hbase.rootdir", "/apps/hbase")
        // 设置查询的表名
        conf.set(TableInputFormat.INPUT_TABLE, "test")
        conf.set("fs.defaultFS","hdfs://xxxxxx:8020") 
        conf.set(TableInputFormat.SCAN, getScanStr())
        conf
      }
    
      // 获取扫描器
      def getScanStr(): String = {
        val scan = new Scan()
        // scan.set....  各种过滤
        val proto = ProtobufUtil.toScan(scan)
        Base64.encodeBytes(proto.toByteArray())
      }
    }
    

    TableSnapshotInputFormat.setInput 方法参数解析:

    public static void setInput(org.apache.hadoop.mapreduce.Job job,
                                String snapshotName,
                                org.apache.hadoop.fs.Path restoreDir)
                                throws IOException
    参数解析:
    job - the job to configure
    snapshotName - the name of the snapshot to read from
    restoreDir - a temporary directory to restore the snapshot into. 
    Current user should have write permissions to this directory, and this should not be a subdirectory of rootdir. 
    After the job is finished, restoreDir can be deleted.
    

    项目用到的 pom.xml 文件:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.zpb.test</groupId>
        <artifactId>spark-read-hbase-snapshot-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
        <packaging>jar</packaging>
    
        <name>spark-read-hbase-snapshot-demo</name>
        <url>http://maven.apache.org</url>
    
        <repositories>
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
            </repository>
        </repositories>
    
        <properties>
            <cdh.hbase.version>1.2.0-cdh5.7.0</cdh.hbase.version>
            <cdh.spark.version>1.6.0-cdh5.7.0</cdh.spark.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>3.8.1</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.62</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.10</artifactId>
                <version>${cdh.spark.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>${cdh.hbase.version}</version>
            </dependency>
        </dependencies>
    </project>
    
    扫描二维码关注博主公众号

    转载请注明出处!欢迎关注本人微信公众号【HBase工作笔记】

    相关文章

      网友评论

        本文标题:HBase实操:Spark-Read-HBase-Snapsho

        本文链接:https://www.haomeiwen.com/subject/leysvhtx.html