美文网首页
flink的TableAPI和SQL的基本使用

flink的TableAPI和SQL的基本使用

作者: 傻疯子 | 来源:发表于2021-12-07 23:52 被阅读0次

TableAPI和SQL是关系型API,可以让用户很方便的查询结果。

编程模型

获取环境
创建输入表
执行查询
创建输出表
查询输出到输出表
创建Table环境对象

第一种方法

val settings = EnvironmentSettings.newInstance()
    .inStreamingMode()
    //.inBatchMode()
    .build()
val tableEnv = TableEnvironment.create(settings)

第二种方法

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
创建输入表
    tableEnv.executeSql("" +
      "create table myTable(\n" +
      "id int,\n" +
      "name string\n" +
      ") with (\n" +
      "'connector.type' = 'filesystem',\n" +
      "'connector.path' = '/Users/shafengzi/source',\n" +
      "'format.type' = 'csv'\n" +
      ")")

另一种写法

sTableEnv.executeSql(
  """
    |create table myTable(
    |id int,
    |name string
    |) with (
    |'connector.type' = 'filesystem',
    |'connector.path' = '/Users/shafengzi/source',
    |'format.type' = 'csv'
    |)
    |""".stripMargin)
查询和过滤

使用SQL

val result = tableEnv.sqlQuery("select id,name from myTable where id > 1")

使用TableAPI

    import org.apache.flink.table.api._
    val result = tableEnv.from("myTable")
      .select($"id", $"name")
      .filter($"id" > 1)
创建输出表
   tableEnv.executeSql("" +
      "create table newTable(\n" +
      "id int,\n" +
      "name string\n" +
      ") with (\n" +
      "'connector.type' = 'filesystem',\n" +
      "'connector.path' = '/Users/shafengzi/res',\n" +
      "'format.type' = 'csv'\n" +
      ")")
输出结果
    result.executeInsert("newTable")

DataStream、DataSet和Table之间的转换

DataStream转换为view视图
    import org.apache.flink.table.api._
    ssTableEnv.createTemporaryView("myTable",stream,'id,'name)
    ssTableEnv.sqlQuery("select * from myTable where id > 1")
DataStream转换为Table对象
    val table = ssTableEnv.fromDataStream(stream, $"id", $"name")
    table.select($"id",$"name")
      .filter($"id" > 1)
将Table转换成DataStream

只有增量操作

   import org.apache.flink.api.scala._
    val appStream = tableEnv.toAppendStream[Row](table)
    appStream.map(row=>(row.getField(0).toString.toInt,row.getField(1).toString))
    env.execute("TableToDataStream")

有增加和删除操作

    val retStream = tableEnv.toRetractStream[Row](table)
    retStream.map(tup=>{
      val flag = tup._1
      val row = tup._2
      val id = row.getField(0).toString.toInt
      val name = row.getField(1).toString
      (flag,id,name)
    })
    env.execute("TableToDataStream")

相关文章

网友评论

      本文标题:flink的TableAPI和SQL的基本使用

      本文链接:https://www.haomeiwen.com/subject/ugpaxrtx.html