Hbase端代码示例
- 这里使用线程池模拟异步IO,本人Hbase实验版本为1.30,Hbase 2.0才开始支持异步客户端
import java.util.concurrent.{ExecutorService, Executors}
import org.apache.flink.streaming.api.scala.async.{AsyncFunction, ResultFuture}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.{Get, Table}
import org.apache.hadoop.hbase.util.Bytes
class AsyncHbaseClient extends AsyncFunction[(String, Int), (String, Int, String)] {
lazy val table: Table = HbaseUtil().connect.getTable(TableName.valueOf("test"))
lazy val executorService: ExecutorService = Executors.newFixedThreadPool(30)
override def asyncInvoke(input: (String, Int), resultFuture: ResultFuture[(String, Int, String)]): Unit = {
executorService.submit(new Runnable {
override def run(): Unit = {
val get = new Get(Bytes.toBytes(input._1))
get.addColumn(Bytes.toBytes("a"), Bytes.toBytes("key11"))
val result = table.get(get)
val value = Bytes.toString(result.getValue(Bytes.toBytes("a"), Bytes.toBytes("key11")))
// 一定要记得放回 resultFuture,不然数据全部是timeout 的
resultFuture.complete(Iterable((input._1, input._2, value)))
}
})
}
override def timeout(input: (String, Int), resultFuture: ResultFuture[(String, Int, String)]): Unit = {
resultFuture.complete(Iterable((input._1, input._2, "timeout")))
}
}
主函数调用示例
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(2)
import org.apache.flink.api.scala._
val list = ArrayBuffer[String]()
list.append("my-key-1")
list.append("my-key-1")
list.append("my-key-1")
list.append("my-key-1")
val text = env.fromCollection(list).map((_, 1))
// 添加一个 async I/O 的转换
val resultStream: DataStream[(String, Int,String)] = AsyncDataStream.orderedWait[(String, Int),(String, Int,String)](
text,new AsyncHbaseClient(),1000L,TimeUnit.MILLISECONDS,10
)
resultStream.print()
env.execute("Streaming WordCount")
}
网友评论