美文网首页我爱编程
Spark批量保存记录到HBase

Spark批量保存记录到HBase

作者: DeepMine | 来源:发表于2017-09-13 15:28 被阅读0次

Spark PairRDDFunctions提供了两个API函数saveAsHadoopDataset和saveAsNewAPIHadoopDataset,将RDD输出到Hadoop支持的存储系统中。

PairRDDFunctions

方法说明

def saveAsHadoopDataset(conf: JobConf): Unit
    Output the RDD to any Hadoop-supported storage system, 
    using a Hadoop JobConf object for that storage system. 
    The JobConf should set an OutputFormat and any output 
    paths required (e.g. a table name to write to) in the 
    same way as it would be configured for a Hadoop MapReduce job.
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit
    Output the RDD to any Hadoop-supported storage system with new Hadoop API, 
    using a Hadoop Configuration object for that storage system. 
    The Conf should set an OutputFormat and any output paths required 
    (e.g. a table name to write to) in the same way as it would be 
    configured for a Hadoop MapReduce job.

举例

package com.welab.wetag.mine

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{ Put, Durability }
import org.apache.hadoop.hbase.util.Bytes

object SaveHbase {
  //记录转换代码
  def convert(triple: (Int, String, Int)) = {
    val (key, name, age) = triple
    val p = new Put(Bytes.toBytes(key))
    p.setDurability(Durability.SKIP_WAL)
    p.addColumn(Bytes.toBytes("attr"), Bytes.toBytes("name"), Bytes.toBytes(name))
    p.addColumn(Bytes.toBytes("attr"), Bytes.toBytes("age"), Bytes.toBytes(age))
    (new ImmutableBytesWritable, p)
  }

  def main(args: Array[String]) {
    val appName: String = this.getClass.getSimpleName.split("\\$").last
    val sc = new SparkContext(new SparkConf().setAppName(appName))

    //定义HBase的配置,保证wetag已经创建
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.property.clientPort", "2181")
    conf.set("hbase.zookeeper.quorum", "sparkmaster1")
    conf.set(TableOutputFormat.OUTPUT_TABLE, "wetag")

    val job = Job.getInstance(conf)
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val rawData = Array((1, "Jazz", 14), (2, "Andy", 18), (3, "Vincent", 38))
    /* 业务逻辑,放在map里面实现
    val xdata = sc.parallelize(rawData).map {
      case (key, name, age) => {
        val p = new Put(Bytes.toBytes(key))
        p.setDurability(Durability.SKIP_WAL)
        p.addColumn(Bytes.toBytes("attr"), Bytes.toBytes("name"), Bytes.toBytes(name))
        p.addColumn(Bytes.toBytes("attr"), Bytes.toBytes("age"), Bytes.toBytes(age))
        (new ImmutableBytesWritable, p)
      }
    }
    */
    val xdata = sc.parallelize(rawData).map(convert)
    xdata.saveAsNewAPIHadoopDataset(job.getConfiguration)
  }
}

以上代码编译成jar,在spark-submit提交执行

相关文章

网友评论

    本文标题:Spark批量保存记录到HBase

    本文链接:https://www.haomeiwen.com/subject/qrtlsxtx.html