美文网首页spark生态系统大数据玩转大数据
Spark整合HBase(自定义HBase DataSource

Spark整合HBase(自定义HBase DataSource

作者: BIGUFO | 来源:发表于2018-02-02 22:05 被阅读199次

    背景

    Spark支持多种数据源,但是Spark对HBase 的读写都没有相对优雅的api,但spark和HBase整合的场景又比较多,故通过spark的DataSource API自己实现了一套比较方便操作HBase的API。

    写 HBase

    写HBase会根据Dataframe的schema写入对应数据类型的数据到Hbase,先上使用示例:

    import spark.implicits._
    import org.apache.hack.spark._
    val df = spark.createDataset(Seq(("ufo",  "play"), ("yy",  ""))).toDF("name", "like")
    // 方式一
    val options = Map(
                "hbase.table.rowkey.field" -> "name",
                "hbase.table.numReg" -> "12",
                "hbase.table.rowkey.prefix" -> "00",
                "bulkload.enable" -> "false"
            )
    df.saveToHbase("hbase_table", Some("XXX:2181"), options)
    // 方式二
    df1.write.format("org.apache.spark.sql.execution.datasources.hbase")
                .options(Map(
                    "hbase.table.rowkey.field" -> "name",
                    "hbase.table.name" -> "hbase_table",
                    "hbase.zookeeper.quorum" -> "XXX:2181",
                    "hbase.table.rowkey.prefix" -> "00",
                    "hbase.table.numReg" -> "12",
                    "bulkload.enable" -> "false"
                )).save()
    

    上面两种方式实现的效果是一样的,下面解释一下每个参数的含义:

    • hbase.zookeeper.quorum:zookeeper地址
    • hbase.table.rowkey.field:spark临时表的哪个字段作为hbase的rowkey,默认第一个字段
    • bulkload.enable:是否启动bulkload,默认不启动,当要插入的hbase表只有一列rowkey时,必需启动
    • hbase.table.name:Hbase表名
    • hbase.table.family:列族名,默认info
    • hbase.table.startKey:预分区开始key,当hbase表不存在时,会自动创建Hbase表,不带一下三个参数则只有一个分区
    • hbase.table.endKey:预分区开始key
    • hbase.table.numReg:分区个数
    • hbase.table.rowkey.prefix: 当rowkey是数字开头,预分区需要指明前缀的formate形式,如 00
    • hbase.check_table: 写入hbase表时,是否需要检查表是否存在,默认 false

    读 HBase

    示例代码如下:

    // 方式一
    import org.apache.hack.spark._
     val options = Map(
        "spark.table.schema" -> "appid:String,appstoreid:int,firm:String",
        "hbase.table.schema" -> ":rowkey,info:appStoreId,info:firm"
    )
    spark.hbaseTableAsDataFrame("hbase_table", Some("XXX:2181")).show(false)
    // 方式二
    spark.read.format("org.apache.spark.sql.execution.datasources.hbase").
                options(Map(
                "spark.table.schema" -> "appid:String,appstoreid:int,firm:String",
                "hbase.table.schema" -> ":rowkey,info:appStoreId,info:firm",
                "hbase.zookeeper.quorum" -> "XXX:2181",
                "hbase.table.name" -> "hbase_table"
            )).load.show(false)  
    

    spark和hbase表的schema映射关系指定不是必须的,默认会生成rowkey和content两个字段,content是由所有字段组成的json字符串,可通过field.type.fieldname对单个字段设置数据类型,默认都是StringType。这样映射出来还得通过spark程序转一下才是你想要的样子,而且所有字段都会去扫描,相对来说不是特别高效。

    故我们可自定义schema映射来获取数据:

    • hbase.zookeeper.quorum:zookeeper地址
    • spark.table.schema:Spark临时表对应的schema eg: "ID:String,appname:String,age:Int"
    • hbase.table.schema:Hbase表对应schema eg: ":rowkey,info:appname,info:age"
    • hbase.table.name:Hbase表名
    • spark.rowkey.view.name:rowkey对应的dataframe创建的tempview名(设置了该值后,只获取rowkey对应的数据)

    注意这两个schema是一一对应的,Hbase只会扫描hbase.table.schema对应的列。

    源码在我的 GitHub,欢迎star

    相关文章

      网友评论

        本文标题:Spark整合HBase(自定义HBase DataSource

        本文链接:https://www.haomeiwen.com/subject/blyezxtx.html