美文网首页
spark读写hbase基本操作

spark读写hbase基本操作

作者: 会飞的蜗牛66666 | 来源:发表于2018-12-27 13:51 被阅读0次

依赖包:
<dependencies>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.11</artifactId>
        <version>2.1.1</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-server -->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.2.1</version>
    </dependency>

</dependencies>

代码实现如下:
package com.ky.service

import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.hadoop.mapred.JobConf
import org.apache.log4j.{Level, Logger}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

object HbaseSpark {

def main(args: Array[String]): Unit = {

Logger.getLogger("org").setLevel(Level.ERROR)

val spark = SparkSession.builder().master("local[*]").appName(s"${this.getClass.getSimpleName}").getOrCreate()
val sc = spark.sparkContext

val table = "student"
val family = "info"
val column1 = "name"
val column2 = "age"

val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("spark.executor.memory", "3000m")
conf.set("hbase.zookeeper.quorum", "192.168.1.11:2181,192.168.1.12:2181")
conf.set("zookeeper.znode.parent", "/hbase-unsecure")
conf.set(TableInputFormat.INPUT_TABLE, table)

val startRowkey = "1"
val endRowkey = "4"

val scan = new Scan(Bytes.toBytes(startRowkey), Bytes.toBytes(endRowkey))
scan.setCacheBlocks(false)

val proto = ProtobufUtil.toScan(scan)
val scanToString = Base64.encodeBytes(proto.toByteArray)
conf.set(TableInputFormat.SCAN, scanToString)

val hbaseRdd: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result])
val counts = hbaseRdd.count()
println(counts)

//读取hbase信息,扫描信息
hbaseRdd.foreach { case (_, result) => {
  val key: String = Bytes.toString(result.getRow)
  val name = Bytes.toString(result.getValue(family.getBytes(), column1.getBytes()))
  val age = Bytes.toString(result.getValue(family.getBytes(), column2.getBytes()))
  println(s"the name is $name,the age is $age")
}
}

//写入数据到hbase
//初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!初始化jobconf,TableOutputFormat必须是org.apache.hadoop.hbase.mapred包下的!
val jobConf = new JobConf(conf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, table)
val dataRdd = sc.makeRDD(Array("111,jack,15", "222,Lily,16", "333,mike,16"))
val rdd = dataRdd.map(_.split(',')).map(arr => {
  val put = new Put(Bytes.toBytes(arr(0)))
  put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(arr(1)))
  put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(arr(2)))
  (new ImmutableBytesWritable, put)
})
rdd.saveAsHadoopDataset(jobConf)

spark.stop()

}

}

相关文章

  • spark读写hbase基本操作

    依赖包:org.apache.spark

  • 像我这样的 jar 包

    Apache HBase Connector 这个 Hbase 连接器可以使用 Spark 读写 Hbase。但...

  • Spark整合HBase(自定义HBase DataSource

    背景 Spark支持多种数据源,但是Spark对HBase 的读写都没有相对优雅的api,但spark和HBase...

  • HBase基本操作以及spark操作HBase

    HBase是建立在HDFS之上,提供高可靠性、高性能、列存储、可伸缩、实时读写的数据库系统。 特点:表可以很大:一...

  • HBase读写操作

    HBase读写操作 读和写是Hbase的两种常见的基本操作,这两种操作都会涉及到Hfile和Meta表,我们依次看...

  • spark on hbase 读写

    本文主要讲述了spark对hbase进行独写的两种方式,这两种方式分别为:1.利用spark提供的 newAPIH...

  • Spark读写HBase实践

    Spark经常会读写一些外部数据源,常见的有HDFS、HBase、JDBC、Redis、Kafka等。这些都是Sp...

  • Spark&Hbase操作

    HBase 新版 API 进行 CRUD 基本操作配置环境 Hbase基本操作新版 API 中加入了 Connec...

  • 【Spark实战】Spark之读写HBase

    1 配置 1.1 开发环境: HBase:hbase-1.0.0-cdh5.4.5.tar.gz Hadoop:h...

  • HBase原理总结

    在总结Spark读写HBase的同时,也顺便学习了一下HBase的原理,同样做个简单的记录。事实上,相关的总结网上...

网友评论

      本文标题:spark读写hbase基本操作

      本文链接:https://www.haomeiwen.com/subject/kyurlqtx.html