spark操作Hbase表

作者: Tim在路上 | 来源:发表于2019-03-11 10:08 被阅读2次

    1. 创建conf和table

    var tableName = "httpsystem_dev" 
    val conf= HBaseConfiguration.create()
    //设置要查询的表
    conf.set(TableInputFormat.INPUT_TABLE,tableName)
    val table = new HTable(conf,tableName)
    

    2. 通过SparkAPI读取数据

    val hbaseRDD = sc.newAPIHadoopRDD(hbaseConfiguration, classOf[TableInputFormat],
            classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
            classOf[org.apache.hadoop.hbase.client.Result])
    

    返回的数据是一个ImmutableBytesWritable,和一个result组成的二元组,result就是一个列表

    3. 通过扫描设置相查询数据

    var scan = new Scan()
          scan.addFamily(Bytes.toBytes("0"))
          scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("ML_rule_juge_id"))
          scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("ML_juge_mal"))
          scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("ML_juge_type"))
          scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("DLCNN_juge_mal"))
          scan.addColumn(Bytes.toBytes("0"), Bytes.toBytes("DLCNN_juge_type"))
    
          //spark读取hbase转换rdd
          var proto = ProtobufUtil.toScan(scan)
          var scanToString = Base64.encodeBytes(proto.toByteArray)
          hbaseConfiguration.set(TableInputFormat.SCAN, scanToString)
    

    4. 将RDD转换为Df

    
          //rdd返回df
          var rdd = hbaseRDD.map(new org.apache.spark.api.java.function.Function[(ImmutableBytesWritable, Result), Row] {
            override def call(v1: (ImmutableBytesWritable, Result)): Row = {
              var result: Result = v1._2
              var rowkey: String = Bytes.toString(result.getRow)
              var ML_juge_type: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("ML_juge_type")))
              var ML_rule_juge_id: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("ML_rule_juge_id")))
              var ML_juge_mal: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("ML_juge_mal")))
              var DLCNN_juge_type: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("DLCNN_juge_type")))
              var DLCNN_juge_mal: String = Bytes.toString(result.getValue(Bytes.toBytes("0"), Bytes.toBytes("DLCNN_juge_mal")))
    
              RowFactory.create(rowkey, ML_rule_juge_id, ML_juge_mal, ML_juge_type, DLCNN_juge_mal, DLCNN_juge_type)
            }
          })
    
          //创建df
          var df = sparkSession.createDataFrame(rdd, HttpParingSchema.struct)
    

    5.数据的写入

    val put = new Put(Bytes.toBytes("rowKey"))
    put.add("cf","q","value")
    

    批量写入

    val rdd = sc.textFile("/data/produce/2015/2015-03-01.log") v
    al data = rdd.map(_.split("\t")).map{x=>(x(0)+x(1),x(2))} 
    val result = data.foreachPartition{x => {
    val conf= HBaseConfiguration.create();
    conf.set(TableInputFormat.INPUT_TABLE,"data");
    conf.set("hbase.zookeeper.quorum","slave5,slave6,slave7");
    conf.set("hbase.zookeeper.property.clientPort","2181");
    conf.addResource("/home/hadoop/data/lib/hbase-site.xml");
    val table = new HTable(conf,"data");
    table.setAutoFlush(false,false);
    table.setWriteBufferSize(3*1024*1024);
    x.foreach{y => { var put= new Put(Bytes.toBytes(y._1));
    put.add(Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(y._2));table.put(put)
    };
    table.flushCommits}}}
    

    6.使用Bulkload插入数据

    val conf = HBaseConfiguration.create(); 
    val tableName = "data1" val table = new HTable(conf,tableName)
    conf.set(TableOutputFormat.OUTPUT_TABLE,tableName) 
    lazy val job = Job.getInstance(conf)
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])
    HFileOutputFormat.configureIncrementalLoad(job,table) 
    val rdd = sc.textFile("/data/produce/2015/2015-03-01.log").map(_.split("@")).map{x => (DigestUtils.md5Hex(x(0)+x(1)).substring(0,3)+x(0)+x(1),x(2))}.sortBy(x =>x._1).map{x=>{val kv:KeyValue = new KeyValue(Bytes.toBytes(x._1),Bytes.toBytes("v"),Bytes.toBytes("value"),Bytes.toBytes(x._2+""));
    (new ImmutableBytesWritable(kv.getKey),kv)}}
    rdd.saveAsNewAPIHadoopFile("/tmp/data1",classOf[ImmutableBytesWritable],classOf[KeyValue],classOf[HFileOutputFormat],job.getConfiguration()) 
    val bulkLoader = new LoadIncrementalHFiles(conf)
    bulkLoader.doBulkLoad(new Path("/tmp/data1"),table)
    

    相关文章

      网友评论

        本文标题:spark操作Hbase表

        本文链接:https://www.haomeiwen.com/subject/inkvpqtx.html