1.flink异步io的定义参考
2.应用场景之流表join维表。
流表是kafka等流式数据。
维表可以是一个mysql或者cassandra,redis等存储,甚至是自己定义的一些api。
根据流表join维表的字段去异步查询维表。
3.举个例子
流表:kafka id1,id2,id3三列
维表:mysql id,age,name
sql:select id1,id2,id3,age,name from kafka join mysql on id1=id;
join的结果就是: id1,id2,id3,age,name 流表的字段加上mysql维表的字段。
流表这边提供id1,给到维表,维表那边执行的sql是select * from mysql where id=id1
4.实战
参考袋鼠云开源的flinkStreamSQL:
https://github.com/DTStack/flinkStreamSQL
基于开源的flink,对其实时sql进行扩展;主要实现了流与维表的join,支持原生flink SQL所有的语法
源表:kafka 0.9,1.x版本
维表:mysql,SQlServer,oracle,hbase,mongo,redis,cassandra
结果表:mysql,SQlServer,oracle,hbase,elasticsearch5.x,mongo,redis,cassandra
核心是
AsyncReqRow asyncDbReq = loadAsyncReq(sideType, sqlRootDir, rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
//TODO How much should be set for the degree of parallelism? Timeout? capacity settings?
if (ORDERED.equals(sideTableInfo.getCacheMode())){
return AsyncDataStream.orderedWait(inputStream, asyncDbReq, sideTableInfo.getAsyncTimeout(), TimeUnit.MILLISECONDS, sideTableInfo.getAsyncCapacity())
.setParallelism(sideTableInfo.getParallelism());
}else {
return AsyncDataStream.unorderedWait(inputStream, asyncDbReq, sideTableInfo.getAsyncTimeout(), TimeUnit.MILLISECONDS, sideTableInfo.getAsyncCapacity())
.setParallelism(sideTableInfo.getParallelism());
}
inputStream 就是我们的流表
loadAsyncReq 就是返回一个RichAsyncFunction,定义是
public abstract class AsyncReqRow extends RichAsyncFunction<Row, Row>
就是从维表中查询数据,目前袋鼠云支持的几种维表有
image.png
5.扩展
流表来源只有kafka,太少,我们可以扩展一下读取mysql作为流。参考这里https://www.jianshu.com/p/5faa7f822d89
网友评论