美文网首页
Spark读写HBase表数据

Spark读写HBase表数据

作者: 扎西的德勒 | 来源:发表于2021-05-24 22:24 被阅读0次

    一、Maven依赖

    <repositories>
            <!-- spark on hbase是cloudera提供的,所以这个地方添加了cdh仓库地址 -->
            <repository>
                <id>cloudera</id>
                <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
            </repository>
    </repositories>
    <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.3.3</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.7</version>
            <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-spark -->
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-spark</artifactId>
                <version>2.1.0-cdh6.2.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.3.3</version>
            </dependency>
    </dependencies>
    

    二、Spark代码

    
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.hbase.client.{Put, Result, Scan}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
    import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
    import org.apache.hadoop.hbase.spark.HBaseContext
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapreduce.Job
    import org.apache.spark.rdd.RDD
    
    
    object SparkOnHBase {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("sparkOnHBase").setMaster("local[*]")
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN")
    
        //创建HBase的环境变量参数
        val hbaseConf: Configuration = HBaseConfiguration.create()
        hbaseConf.set("hbase.zookeeper.quorum","node01,node02,node03")
        hbaseConf.set("hbase.zookeeper.property.clientPort","2181")
        hbaseConf.set(TableInputFormat.INPUT_TABLE,"spark_hbase")
    
        val hbaseContext: HBaseContext = new HBaseContext(sc, hbaseConf)
    
        val scan: Scan = new Scan()
    
        val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = hbaseContext.hbaseRDD(TableName.valueOf("spark_hbase"), scan)
    
        hbaseRDD.map(eachResult => {
          //      val rowkey1: String = Bytes.toString(eachResult._1.get())
          val result: Result = eachResult._2
          val rowKey: String = Bytes.toString(result.getRow)
    
          val name: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
          val age: String = Bytes.toString(result.getValue(Bytes.toBytes("info"), Bytes.toBytes("age")))
          //      println(rowKey+":"+name+":"+age)
          rowKey + ":" + name + ":" + age
        }).foreach(println)
    
        //向HBase写数据,提前创建HBase表:create 'spark_hbase_out','info'
        hbaseConf.set(TableOutputFormat.OUTPUT_TABLE,"spark_hbase_out")
        //通过job来设置输出的格式的类
        val job = Job.getInstance(hbaseConf)
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setOutputValueClass(classOf[Result])
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    
        val initialRDD: RDD[(String, String, String)] = sc.parallelize(List(("100", "apple", "11"), ("200", "banana", "12"), ("300", "pear", "13")))
    
        val hbaseRDD2: RDD[(ImmutableBytesWritable, Put)] = initialRDD.map(x => {
          val put: Put = new Put(Bytes.toBytes(x._1))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(x._2))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(x._3))
          (new ImmutableBytesWritable(), put)
        })
    
        hbaseRDD2.saveAsNewAPIHadoopDataset(job.getConfiguration)
    
        sc.stop()
    
      }
    
    }
    

    相关文章

      网友评论

          本文标题:Spark读写HBase表数据

          本文链接:https://www.haomeiwen.com/subject/vdpfsltx.html