美文网首页
Spark实例-Spark读取Hbase表后转成DataFram

Spark实例-Spark读取Hbase表后转成DataFram

作者: __元昊__ | 来源:发表于2019-05-05 19:01 被阅读0次

    hbase表中数据结构如下:


    QQ截图20190505185659.png

    代码:

    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Result
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.spark.sql.SparkSession
    
    object readHbase_dataframe {
      val zookeeper_ip="192.168.199.128,192.168.199.131,192.168.199.132"
      val tableName="user"
    
      def main(args: Array[String]): Unit = {
        val spark= SparkSession.builder()
          .appName("readHbase_dataframe")
          .master("local")
          .getOrCreate()
    
        val hbaseConf = HBaseConfiguration.create()
        hbaseConf.set("hbase.zookeeper.quorum", zookeeper_ip)
        hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
        hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
    
        val sqlContext = spark.sqlContext
        val sc=spark.sparkContext
        import sqlContext.implicits._ //没有的话 rdd不能调用.toDF
        // 从数据源获取数据
        val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
    
        // 将数据映射为表  也就是将 RDD转化为 dataframe schema
        val hbaseTable=hbaseRDD.map(r=>{
          val hbase_result=r._2
          val rowkey=Bytes.toString(hbase_result.getRow)
          val name=Bytes.toString(hbase_result.getValue(Bytes.toBytes("info"),Bytes.toBytes("name")))
          val age=Bytes.toString(hbase_result.getValue(Bytes.toBytes("info"),Bytes.toBytes("age")))
    
          (rowkey,name,age)
      }).toDF("id","name","age")
    
        hbaseTable.createOrReplaceTempView("emp")
        val hbase_result=spark.sql("select * from emp")
        hbase_result.show()
      }
    }
    

    运行结果:


    QQ截图20190505190129.png

    上述转成标准关系型数据库格式的Dataframe,但是代码里要手写所有的列名,比较不自动化。也可以只传hbase表名一个参数,转换成hbase格式的Dataframe

    代码:

    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.mapreduce.TableInputFormat
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.types.{DataTypes,StructField, StructType}
    import test1.hdfsUtil
    import test_TBDS.HbaseUtils
    
    object ReadHbase_Dataframe {
      val zookeeper_ip="172.24.112.13,172.24.112.14,172.24.112.15"
      val tableName="testTable"
    
      def main(args: Array[String]): Unit = {
        System.setProperty("HADOOP_USER_NAME", "hdfs")
    
        val spark= SparkSession.builder()
          .appName("ReadHbase_Dataframe")
          .master("local")
          .getOrCreate()
    
        val hbaseConf = HBaseConfiguration.create()
        hbaseConf.set("hbase.zookeeper.quorum", zookeeper_ip)
        hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
        hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName)
    
        val hbase_list=HbaseUtils.getAllData(tableName)//用到hbase,hdfs的简单工具类
        if(hdfsUtil.isExists("hdfs://172.24.112.11:8020/input/hbase.csv")){
          hdfsUtil.deleteHDFSFile("hdfs://172.24.112.11:8020/input/hbase.csv")
        }
        for(context<-hbase_list){
          println(context)
          hdfsUtil.append("hdfs://172.24.112.11:8020/input/hbase.csv",context+"\n")
        }
    
        val myschema = StructType(List(StructField("rowkey", DataTypes.StringType)
          , StructField("family", DataTypes.StringType)
          ,StructField("column", DataTypes.StringType)
          ,StructField("value", DataTypes.StringType)
          ))
        val df=spark.read.schema(myschema).csv("hdfs://172.24.112.11:8020/input/hbase.csv")
        df.createOrReplaceTempView("emp")
        val hbase_result=spark.sql("select * from emp")
        hbase_result.show()
      }
    }
    

    结果:


    微信截图_20190510104119.png

    相关文章

      网友评论

          本文标题:Spark实例-Spark读取Hbase表后转成DataFram

          本文链接:https://www.haomeiwen.com/subject/lttkoqtx.html