flink 写入数据到hbase
- 连接数等于并行度,用法同RichFunction
- 可以实现查询hbase,存入hbase
欢迎批评指正
hbase 依赖
<dependency>
<groupId>org.apache.hbase</groupId>
<!--shaded主要是解决jar包冲突-->
<artifactId>hbase-shaded-client</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.3.1</version>
</dependency>
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val list = ArrayBuffer[String]()
list.append("my-key-1")
val text = env.fromCollection(list)
text.map((_, 1)).process(new ProcessFunction[(String, Int), String] {
var table: Table = _
override def open(parameters: Configuration): Unit = {
table = HbaseUtil().connect.getTable(TableName.valueOf("test"))
}
override def processElement(value: (String, Int), ctx: ProcessFunction[(String, Int), String]#Context, out: Collector[String]): Unit = {
//读取
val get = new Get(Bytes.toBytes(value._1))
get.addColumn(Bytes.toBytes("a"), Bytes.toBytes("key11"))
val result = table.get(get)
val v = Bytes.toString(result.getValue(Bytes.toBytes("a"), Bytes.toBytes("key11")))
println(v)
//写入
val put = new Put(Bytes.toBytes("shx_" + value._2))
put.addColumn(Bytes.toBytes("f"), Bytes.toBytes("name"), Bytes.toBytes("lisi"))
table.put(put)
}
override def close(): Unit = {
table.close()
}
})
网友评论