美文网首页大数据学习
Spark Rdd(JSONObject) 读写hbase

Spark Rdd(JSONObject) 读写hbase

作者: xiaogp | 来源:发表于2020-07-03 17:21 被阅读0次

    读, 使用newAPIHadoopRDD 方法

    import org.apache.hadoop.hbase.client._
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkContext, SparkConf}
    
    
    object readHbaseTest {
      def main(args: Array[String]): Unit = {
    
        // 本地模式运行,便于测试
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("readHbaseTest")
    
        // 创建hbase configuration
        val hBaseConf = HBaseConfiguration.create()
        hBaseConf.set("hbase.zookeeper.quorum", "192.168.61.97")  //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
        hBaseConf.set("hbase.zookeeper.property.clientPort", "2181")       //设置zookeeper连接端口,默认2181
        hBaseConf.set(TableInputFormat.INPUT_TABLE, "test:gp")
    
        // 创建 spark context
        val sc = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
    
        // 从数据源获取数据
        val hbaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
    
        // 将数据映射为表  也就是将 RDD转化为 dataframe schema
        val data = hbaseRDD.map(r => (
          Bytes.toString(r._2.getRow()),
          Bytes.toString(r._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
        )).toDF("rowkey", "category")
    
        data.show()
    
      }
    }
    
    +------+--------+
    |rowkey|category|
    +------+--------+
    | rk001|      gp|
    | rk002|      wf|
    | rk003|     lqq|
    +------+--------+
    

    写, 使用saveAsHadoopDataset 方法

    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapred.JobConf
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SQLContext
    
    
    object writeHbaseTest {
    
      def main(args: Array[String]): Unit = {
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("readHbaseTest")
        val sc = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
    
        val data = Seq((1, 2), (2, 3)).toDF("a", "b")
    
        val hBaseConf = HBaseConfiguration.create()
        hBaseConf.set("hbase.zookeeper.quorum", "192.168.61.97")
        hBaseConf.set("hbase.zookeeper.property.clientPort", "2181")
    
        val jobConf = new JobConf(hBaseConf)
        jobConf.setOutputFormat(classOf[TableOutputFormat])
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, "test:gp")
    
        data.rdd.map(row => {
          val rowkey = row(0).toString // 必须是String类型
          val name = row(1).toString
          val put = new Put(Bytes.toBytes(rowkey))
          put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name)) // 列,列分割符,值
          (new ImmutableBytesWritable, put)
        }).saveAsHadoopDataset(jobConf)
      }
    }
    
    hbase(main):003:0> scan 'test:gp'
    ROW                                 COLUMN+CELL                                                                                         
     1                                  column=info:name, timestamp=1593767839176, value=2                                                  
     2                                  column=info:name, timestamp=1593767839176, value=3                                                  
     rk001                              column=info:name, timestamp=1593763852691, value=gp                                                 
     rk002                              column=info:name, timestamp=1593763858131, value=wf                                                 
     rk003                              column=info:name, timestamp=1593765395612, value=lqq 
    

    使用saveAsNewAPIHadoopDataset 方法

    将rdd使用map算子将元素转化为(new ImmutableBytesWritable, put)的rdd,新rdd调用saveAsNewAPIHadoopDataset方法写入Hbase。

    import org.apache.hadoop.hbase.client.{Put, Result}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.sql.SQLContext
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.hbase.util.Bytes
    
    
    object writeHbaseTest2 {
    
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("readHbaseTest")
        val sc = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
    
        val data = Seq((1, 2), (2, 3)).toDF("a", "b")
    
        val tablename = "test:gp"
        sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "192.168.61.97")
        sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
        sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
    
        val job = Job.getInstance(sc.hadoopConfiguration)
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setOutputValueClass(classOf[Result])
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    
        val save_rdd = data.rdd.map(x => {
            // 第一列做rowkey
            val rowkey = x(0).toString
            // 第二列做value,列是info:name
            val name = x(1).toString
            val put = new Put(Bytes.toBytes(rowkey))
            put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
            (new ImmutableBytesWritable, put)
          })
    
        save_rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
      }
    
    }
    

    写,rdd内部为JSON对象,获得put对象,forEach + java.util.function.BiConsumer实现addColumn

    import com.alibaba.fastjson.{JSON, JSONObject}
    import org.apache.hadoop.hbase.client.{Put, Result}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapreduce.Job
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    object JsonSvaeHbase {
      def main(args: Array[String]): Unit = {
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("JsonSvaeHbase")
        val sc = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        val str = "{\"id\": \"101\", \"name\": \"2222\", \"age\": \"3333\"}"
        val jsonObject = JSON.parse(str).asInstanceOf[JSONObject]
        val data = sc.parallelize(Seq(jsonObject))
    
        val tablename = "test:gp"
        sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "192.168.61.97")
        sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
        sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
    
        val job = Job.getInstance(sc.hadoopConfiguration)
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setOutputValueClass(classOf[Result])
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    
        val save_rdd = data.map(x => {
          val id = x.getString("id")
          val put: Put = new Put(Bytes.toBytes(id))
    
    
          x.forEach(new java.util.function.BiConsumer[String, Object]() {
            @Override
            def accept(k: String, v: Object) = {
              if (k != null && v != null) {
                put.addColumn(Bytes.toBytes("info"), k.getBytes, v.toString().getBytes)
              }
            }
          })
          (new ImmutableBytesWritable(), put)
        })
    
        save_rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
      }
    }
    
    //hbase(main):023:0> get 'test:gp', '101'
    //COLUMN                                   CELL
    //  info:age                                timestamp=1597288908139, value=3333
    //info:id                                 timestamp=1597288908139, value=101
    //info:name                               timestamp=1597288908139, value=2222
    

    或者获得put对象,使用JSON.keySet iterator循环addColumn实现

    import com.alibaba.fastjson.{JSON, JSONObject}
    import org.apache.hadoop.hbase.client.{Put, Result}
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapreduce.Job
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    
    object JsonSvaeHbase {
      def main(args: Array[String]): Unit = {
    
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("JsonSvaeHbase")
        val sc = new SparkContext(sparkConf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
        val str = "{\"id\": \"102\", \"name\": \"2222\", \"age\": \"3333\"}"
        val jsonObject = JSON.parse(str).asInstanceOf[JSONObject]
        val data = sc.parallelize(Seq(jsonObject))
    
        val tablename = "test:gp"
        sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "192.168.61.97")
        sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
        sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
    
        val job = Job.getInstance(sc.hadoopConfiguration)
        job.setOutputKeyClass(classOf[ImmutableBytesWritable])
        job.setOutputValueClass(classOf[Result])
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    
        val save_rdd = data.map(x => {
          val id_code = x.getString("id")
          val put = new Put(Bytes.toBytes(id_code))
          insert_hbase(x, put)
          (new ImmutableBytesWritable, put)
        })
    
        save_rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
    
      }
    
      def insert_hbase(jsonObject: JSONObject, onePut: Put): Unit = {
        val keys = jsonObject.keySet
        val iterator = keys.iterator
        while (iterator.hasNext) {
          val col = iterator.next()
          val value = jsonObject.get(col).toString
          onePut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(col), Bytes.toBytes(value))
        }
      }
    }
    
    //hbase(main):038:0> get 'test:gp', '102'
    //COLUMN                                   CELL
    //  info:age                                timestamp=1597290047490, value=3333
    //info:id                                 timestamp=1597290047490, value=102
    //info:name                               timestamp=1597290047490, value=2222
    

    相关文章

      网友评论

        本文标题:Spark Rdd(JSONObject) 读写hbase

        本文链接:https://www.haomeiwen.com/subject/mlteqktx.html