TableAPI和SQL是关系型API,可以让用户很方便的查询结果。
编程模型
获取环境
创建输入表
执行查询
创建输出表
查询输出到输出表
创建Table环境对象
第一种方法
val settings = EnvironmentSettings.newInstance()
.inStreamingMode()
//.inBatchMode()
.build()
val tableEnv = TableEnvironment.create(settings)
第二种方法
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
创建输入表
tableEnv.executeSql("" +
"create table myTable(\n" +
"id int,\n" +
"name string\n" +
") with (\n" +
"'connector.type' = 'filesystem',\n" +
"'connector.path' = '/Users/shafengzi/source',\n" +
"'format.type' = 'csv'\n" +
")")
另一种写法
sTableEnv.executeSql(
"""
|create table myTable(
|id int,
|name string
|) with (
|'connector.type' = 'filesystem',
|'connector.path' = '/Users/shafengzi/source',
|'format.type' = 'csv'
|)
|""".stripMargin)
查询和过滤
使用SQL
val result = tableEnv.sqlQuery("select id,name from myTable where id > 1")
使用TableAPI
import org.apache.flink.table.api._
val result = tableEnv.from("myTable")
.select($"id", $"name")
.filter($"id" > 1)
创建输出表
tableEnv.executeSql("" +
"create table newTable(\n" +
"id int,\n" +
"name string\n" +
") with (\n" +
"'connector.type' = 'filesystem',\n" +
"'connector.path' = '/Users/shafengzi/res',\n" +
"'format.type' = 'csv'\n" +
")")
输出结果
result.executeInsert("newTable")
DataStream、DataSet和Table之间的转换
DataStream转换为view视图
import org.apache.flink.table.api._
ssTableEnv.createTemporaryView("myTable",stream,'id,'name)
ssTableEnv.sqlQuery("select * from myTable where id > 1")
DataStream转换为Table对象
val table = ssTableEnv.fromDataStream(stream, $"id", $"name")
table.select($"id",$"name")
.filter($"id" > 1)
将Table转换成DataStream
只有增量操作
import org.apache.flink.api.scala._
val appStream = tableEnv.toAppendStream[Row](table)
appStream.map(row=>(row.getField(0).toString.toInt,row.getField(1).toString))
env.execute("TableToDataStream")
有增加和删除操作
val retStream = tableEnv.toRetractStream[Row](table)
retStream.map(tup=>{
val flag = tup._1
val row = tup._2
val id = row.getField(0).toString.toInt
val name = row.getField(1).toString
(flag,id,name)
})
env.execute("TableToDataStream")
网友评论