读, 使用newAPIHadoopRDD 方法
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkContext, SparkConf}
object readHbaseTest {
def main(args: Array[String]): Unit = {
// 本地模式运行,便于测试
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("readHbaseTest")
// 创建hbase configuration
val hBaseConf = HBaseConfiguration.create()
hBaseConf.set("hbase.zookeeper.quorum", "192.168.61.97") //设置zooKeeper集群地址,也可以通过将hbase-site.xml导入classpath,但是建议在程序里这样设置
hBaseConf.set("hbase.zookeeper.property.clientPort", "2181") //设置zookeeper连接端口,默认2181
hBaseConf.set(TableInputFormat.INPUT_TABLE, "test:gp")
// 创建 spark context
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// 从数据源获取数据
val hbaseRDD = sc.newAPIHadoopRDD(hBaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
// 将数据映射为表 也就是将 RDD转化为 dataframe schema
val data = hbaseRDD.map(r => (
Bytes.toString(r._2.getRow()),
Bytes.toString(r._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("name")))
)).toDF("rowkey", "category")
data.show()
}
}
+------+--------+
|rowkey|category|
+------+--------+
| rk001| gp|
| rk002| wf|
| rk003| lqq|
+------+--------+
写, 使用saveAsHadoopDataset 方法
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
object writeHbaseTest {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("readHbaseTest")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val data = Seq((1, 2), (2, 3)).toDF("a", "b")
val hBaseConf = HBaseConfiguration.create()
hBaseConf.set("hbase.zookeeper.quorum", "192.168.61.97")
hBaseConf.set("hbase.zookeeper.property.clientPort", "2181")
val jobConf = new JobConf(hBaseConf)
jobConf.setOutputFormat(classOf[TableOutputFormat])
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "test:gp")
data.rdd.map(row => {
val rowkey = row(0).toString // 必须是String类型
val name = row(1).toString
val put = new Put(Bytes.toBytes(rowkey))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name)) // 列,列分割符,值
(new ImmutableBytesWritable, put)
}).saveAsHadoopDataset(jobConf)
}
}
hbase(main):003:0> scan 'test:gp'
ROW COLUMN+CELL
1 column=info:name, timestamp=1593767839176, value=2
2 column=info:name, timestamp=1593767839176, value=3
rk001 column=info:name, timestamp=1593763852691, value=gp
rk002 column=info:name, timestamp=1593763858131, value=wf
rk003 column=info:name, timestamp=1593765395612, value=lqq
使用saveAsNewAPIHadoopDataset 方法
将rdd使用map
算子将元素转化为(new ImmutableBytesWritable, put)
的rdd,新rdd调用saveAsNewAPIHadoopDataset
方法写入Hbase。
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.util.Bytes
object writeHbaseTest2 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("readHbaseTest")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val data = Seq((1, 2), (2, 3)).toDF("a", "b")
val tablename = "test:gp"
sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "192.168.61.97")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val job = Job.getInstance(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val save_rdd = data.rdd.map(x => {
// 第一列做rowkey
val rowkey = x(0).toString
// 第二列做value,列是info:name
val name = x(1).toString
val put = new Put(Bytes.toBytes(rowkey))
put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name))
(new ImmutableBytesWritable, put)
})
save_rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}
写,rdd内部为JSON对象,获得put对象,forEach + java.util.function.BiConsumer实现addColumn
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object JsonSvaeHbase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("JsonSvaeHbase")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val str = "{\"id\": \"101\", \"name\": \"2222\", \"age\": \"3333\"}"
val jsonObject = JSON.parse(str).asInstanceOf[JSONObject]
val data = sc.parallelize(Seq(jsonObject))
val tablename = "test:gp"
sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "192.168.61.97")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val job = Job.getInstance(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val save_rdd = data.map(x => {
val id = x.getString("id")
val put: Put = new Put(Bytes.toBytes(id))
x.forEach(new java.util.function.BiConsumer[String, Object]() {
@Override
def accept(k: String, v: Object) = {
if (k != null && v != null) {
put.addColumn(Bytes.toBytes("info"), k.getBytes, v.toString().getBytes)
}
}
})
(new ImmutableBytesWritable(), put)
})
save_rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}
//hbase(main):023:0> get 'test:gp', '101'
//COLUMN CELL
// info:age timestamp=1597288908139, value=3333
//info:id timestamp=1597288908139, value=101
//info:name timestamp=1597288908139, value=2222
或者获得put对象,使用JSON.keySet iterator循环addColumn实现
import com.alibaba.fastjson.{JSON, JSONObject}
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object JsonSvaeHbase {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("JsonSvaeHbase")
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val str = "{\"id\": \"102\", \"name\": \"2222\", \"age\": \"3333\"}"
val jsonObject = JSON.parse(str).asInstanceOf[JSONObject]
val data = sc.parallelize(Seq(jsonObject))
val tablename = "test:gp"
sc.hadoopConfiguration.set("hbase.zookeeper.quorum", "192.168.61.97")
sc.hadoopConfiguration.set("hbase.zookeeper.property.clientPort", "2181")
sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE, tablename)
val job = Job.getInstance(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[ImmutableBytesWritable])
job.setOutputValueClass(classOf[Result])
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val save_rdd = data.map(x => {
val id_code = x.getString("id")
val put = new Put(Bytes.toBytes(id_code))
insert_hbase(x, put)
(new ImmutableBytesWritable, put)
})
save_rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
def insert_hbase(jsonObject: JSONObject, onePut: Put): Unit = {
val keys = jsonObject.keySet
val iterator = keys.iterator
while (iterator.hasNext) {
val col = iterator.next()
val value = jsonObject.get(col).toString
onePut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(col), Bytes.toBytes(value))
}
}
}
//hbase(main):038:0> get 'test:gp', '102'
//COLUMN CELL
// info:age timestamp=1597290047490, value=3333
//info:id timestamp=1597290047490, value=102
//info:name timestamp=1597290047490, value=2222
网友评论