美文网首页
flink的TableAPI和SQL的基本使用

flink的TableAPI和SQL的基本使用

作者: 傻疯子 | 来源:发表于2021-12-07 23:52 被阅读0次

    TableAPI和SQL是关系型API,可以让用户很方便的查询结果。

    编程模型

    获取环境
    创建输入表
    执行查询
    创建输出表
    查询输出到输出表
    
    创建Table环境对象

    第一种方法

    val settings = EnvironmentSettings.newInstance()
        .inStreamingMode()
        //.inBatchMode()
        .build()
    val tableEnv = TableEnvironment.create(settings)
    

    第二种方法

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tableEnv = StreamTableEnvironment.create(env)
    
    创建输入表
        tableEnv.executeSql("" +
          "create table myTable(\n" +
          "id int,\n" +
          "name string\n" +
          ") with (\n" +
          "'connector.type' = 'filesystem',\n" +
          "'connector.path' = '/Users/shafengzi/source',\n" +
          "'format.type' = 'csv'\n" +
          ")")
    

    另一种写法

    sTableEnv.executeSql(
      """
        |create table myTable(
        |id int,
        |name string
        |) with (
        |'connector.type' = 'filesystem',
        |'connector.path' = '/Users/shafengzi/source',
        |'format.type' = 'csv'
        |)
        |""".stripMargin)
    
    查询和过滤

    使用SQL

    val result = tableEnv.sqlQuery("select id,name from myTable where id > 1")
    

    使用TableAPI

        import org.apache.flink.table.api._
        val result = tableEnv.from("myTable")
          .select($"id", $"name")
          .filter($"id" > 1)
    
    创建输出表
       tableEnv.executeSql("" +
          "create table newTable(\n" +
          "id int,\n" +
          "name string\n" +
          ") with (\n" +
          "'connector.type' = 'filesystem',\n" +
          "'connector.path' = '/Users/shafengzi/res',\n" +
          "'format.type' = 'csv'\n" +
          ")")
    
    输出结果
        result.executeInsert("newTable")
    

    DataStream、DataSet和Table之间的转换

    DataStream转换为view视图
        import org.apache.flink.table.api._
        ssTableEnv.createTemporaryView("myTable",stream,'id,'name)
        ssTableEnv.sqlQuery("select * from myTable where id > 1")
    
    DataStream转换为Table对象
        val table = ssTableEnv.fromDataStream(stream, $"id", $"name")
        table.select($"id",$"name")
          .filter($"id" > 1)
    
    将Table转换成DataStream

    只有增量操作

       import org.apache.flink.api.scala._
        val appStream = tableEnv.toAppendStream[Row](table)
        appStream.map(row=>(row.getField(0).toString.toInt,row.getField(1).toString))
        env.execute("TableToDataStream")
    

    有增加和删除操作

        val retStream = tableEnv.toRetractStream[Row](table)
        retStream.map(tup=>{
          val flag = tup._1
          val row = tup._2
          val id = row.getField(0).toString.toInt
          val name = row.getField(1).toString
          (flag,id,name)
        })
        env.execute("TableToDataStream")
    

    相关文章

      网友评论

          本文标题:flink的TableAPI和SQL的基本使用

          本文链接:https://www.haomeiwen.com/subject/ugpaxrtx.html