第一坑:使用Stream 转table api 中的fromDataStream的java和scala写法(需要引入flink的$符号)
1.下面是官网的scala和java写法
官网的scala写法.png 官网的java写法.png
2、我引入了import org.apache.flink.table.api.Expressions.$ ,上面是1.10写法,使用官网1.11版本的scala编程还是报错:
采用官网scala写法报错
3、最终scala的编程引用java的方式可以了!
import org.apache.flink.table.api.Expressions.$
val dspbidTable: Table = tEnv.fromDataStream(dspbidStream, $("bversionv"), $("bplanid"), $("clickPrice"), $("impressPrice"),
$("planCostType"), $("sspSettle"), $("bgid"), $("sspUid"), $("ip"), $("ua"), $("androidid"),
$("imei"), $("budid"), $("bvid"), $("aps"), $("btimestamp").rowtime())
第二坑:
1、table创建视图方式
过期:
tEnv.registerTable("dspClick", dspClickTable)
现在:
tEnv.createTemporaryView("dspClick", dspClickTable)
2、sql转Stream的方式
过期:
tEnv.registerTable("TemporalJoin", tEnv.sqlQuery(sqlJoin))
val joinSqlStream: DataStream[Row] = tEnv.scan("TemporalJoin").toAppendStream[Row]
现在:(更简洁)
val table: Table = tEnv.sqlQuery(sqlJoin)
val joinSqlStream: DataStream[Row] = tEnv.toAppendStream[Row](table)
网友评论