美文网首页我爱编程
Spark批量保存记录到HBase

Spark批量保存记录到HBase

作者: DeepMine | 来源:发表于2017-09-13 15:28 被阅读0次

    Spark PairRDDFunctions提供了两个API函数saveAsHadoopDataset和saveAsNewAPIHadoopDataset,将RDD输出到Hadoop支持的存储系统中。

    PairRDDFunctions

    方法说明

    def saveAsHadoopDataset(conf: JobConf): Unit
        Output the RDD to any Hadoop-supported storage system, 
        using a Hadoop JobConf object for that storage system. 
        The JobConf should set an OutputFormat and any output 
        paths required (e.g. a table name to write to) in the 
        same way as it would be configured for a Hadoop MapReduce job.
    
    def saveAsNewAPIHadoopDataset(conf: Configuration): Unit
        Output the RDD to any Hadoop-supported storage system with new Hadoop API, 
        using a Hadoop Configuration object for that storage system. 
        The Conf should set an OutputFormat and any output paths required 
        (e.g. a table name to write to) in the same way as it would be 
        configured for a Hadoop MapReduce job.
    

    举例

    package com.welab.wetag.mine
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext._
    import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
    import org.apache.hadoop.mapreduce.Job
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.{ Put, Durability }
    import org.apache.hadoop.hbase.util.Bytes
    
    object SaveHbase {
      //记录转换代码
      def convert(triple: (Int, String, Int)) = {
        val (key, name, age) = triple
        val p = new Put(Bytes.toBytes(key))
        p.setDurability(Durability.SKIP_WAL)
        p.addColumn(Bytes.toBytes("attr"), Bytes.toBytes("name"), Bytes.toBytes(name))
        p.addColumn(Bytes.toBytes("attr"), Bytes.toBytes("age"), Bytes.toBytes(age))
        (new ImmutableBytesWritable, p)
      }
    
      def main(args: Array[String]) {
        val appName: String = this.getClass.getSimpleName.split("\\$").last
        val sc = new SparkContext(new SparkConf().setAppName(appName))
    
        //定义HBase的配置,保证wetag已经创建
        val conf = HBaseConfiguration.create()
        conf.set("hbase.zookeeper.property.clientPort", "2181")
        conf.set("hbase.zookeeper.quorum", "sparkmaster1")
        conf.set(TableOutputFormat.OUTPUT_TABLE, "wetag")
    
        val job = Job.getInstance(conf)
        job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    
        val rawData = Array((1, "Jazz", 14), (2, "Andy", 18), (3, "Vincent", 38))
        /* 业务逻辑,放在map里面实现
        val xdata = sc.parallelize(rawData).map {
          case (key, name, age) => {
            val p = new Put(Bytes.toBytes(key))
            p.setDurability(Durability.SKIP_WAL)
            p.addColumn(Bytes.toBytes("attr"), Bytes.toBytes("name"), Bytes.toBytes(name))
            p.addColumn(Bytes.toBytes("attr"), Bytes.toBytes("age"), Bytes.toBytes(age))
            (new ImmutableBytesWritable, p)
          }
        }
        */
        val xdata = sc.parallelize(rawData).map(convert)
        xdata.saveAsNewAPIHadoopDataset(job.getConfiguration)
      }
    }
    

    以上代码编译成jar,在spark-submit提交执行

    相关文章

      网友评论

        本文标题:Spark批量保存记录到HBase

        本文链接:https://www.haomeiwen.com/subject/qrtlsxtx.html