因为我是在虚拟机上用docker安装的hbase,很多映射端口和主机名不太好搞,所以读模式失败了,试一下写模式也失败了作个记录吧~
一,读代码
package org.bbk.flink
import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.java.tuple
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
import org.apache.flink.addons.hbase.TableInputFormat
object Demo {
def main(args:Array[String]):Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val hbaseData:DataSet[tuple.Tuple2[String, String]] = env
.createInput(new TableInputFormat[tuple.Tuple2[String, String]]{
override def configure(parameters:Configuration): Unit = {
val conf = HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_QUORUM, "myhbase")
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181")
val conn:Connection = ConnectionFactory.createConnection(conf)
table = classOf[HTable].cast(conn.getTable(TableName.valueOf("hbasesource")))
scan = new Scan(){
addFamily(Bytes.toBytes("f1"))
}
}
override def getScanner: Scan = {
scan
}
override def getTableName:String ={
"hbasesource"
}
override def mapResultToTuple(result: Result): tuple.Tuple2[String, String] = {
val rowkey:String = Bytes.toString(result.getRow)
val sb = new StringBuilder()
for (cell:Cell <- result.rawCells()) {
val value = Bytes.toString(cell.getValueArray, cell.getValueOffset, cell.getValueLength)
sb.append(value).append(",")
}
val valueString = sb.replace(sb.length()-1, sb.length(), "").toString
val tuple2 = new org.apache.flink.api.java.tuple.Tuple2[String,String]
tuple2.setField(rowkey, 0)
tuple2.setField(valueString, 1)
tuple2
}
})
hbaseData.print()
env.execute()
}
}
二,写代码
package org.bbk.flink
import java.util
import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction}
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.java.tuple
import org.apache.flink.configuration.Configuration
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{Cell, HBaseConfiguration, HConstants, TableName}
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.common.io.OutputFormat
object Demo {
def main(args:Array[String]):Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val sourceDataSet:DataSet[String] = env.fromElements("01, zhangsan, 28", "02, lisi, 30")
sourceDataSet.output(new HBaseOutputFormat)
env.execute()
}
}
class HBaseOutputFormat extends OutputFormat[String] {
val zkServer = "myhbase"
val port = "2181"
var conn:Connection = null
override def configure(configuration: Configuration): Unit ={
}
override def open(i: Int, i1: Int): Unit = {
val config:org.apache.hadoop.conf.Configuration = HBaseConfiguration.create()
config.set(HConstants.ZOOKEEPER_QUORUM, zkServer)
config.set(HConstants.ZOOKEEPER_CLIENT_PORT, port)
config.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 30000)
config.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30000)
conn = ConnectionFactory.createConnection(config)
}
override def writeRecord(it: String): Unit = {
val tableName: TableName = TableName.valueOf("hbasesource")
val cf1 = "f1"
val array: Array[String] = it.split(",")
val put: Put = new Put(Bytes.toBytes(array(0)))
put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("name"), Bytes.toBytes(array(1)))
put.addColumn(Bytes.toBytes(cf1), Bytes.toBytes("age"), Bytes.toBytes(array(2)))
val putList: util.ArrayList[Put] = new util.ArrayList[Put]
putList.add(put)
val params: BufferedMutatorParams = new BufferedMutatorParams(tableName)
params.writeBufferSize(1024 * 1024)
val mutator: BufferedMutator = conn.getBufferedMutator(params)
mutator.mutate(putList)
mutator.flush()
putList.clear()
}
override def close(): Unit = {
if (null != conn) {
conn.close()
}
}
}
网友评论