美文网首页
Flink SQL 入门指北

Flink SQL 入门指北

作者: xiaoc024 | 来源:发表于2020-11-02 20:29 被阅读0次

    1. Overview

    本文主要来自官网,旨在整理处 Flink SQL 的基本语法和使用,基础向。

    2. API 调用

    2.1 Old Planner VS Blink Planner

    • Blink Planner 对代码生成机制做了改进、对部分算子进行了优化,提供了丰富实用的新功能,如维表 join、Top N、MiniBatch、流式去重、聚合场景的数据倾斜优化等新功能。

    • Blink Planner 的优化策略是基于公共子图的优化算法,包含了基于成本的优化(CBO)和基于规则的优化(CRO)两种策略,优化更为全面。同时,Blink Planner 支持从 catalog 中获取数据源的统计信息,这对CBO优化非常重要。

    • Blink Planner 提供了更多的内置函数,更标准的 SQL 支持,在 Flink 1.9 版本中已经完整支持 TPC-H ,对高阶的 TPC-DS 支持也计划在下一个版本实现。

    Flink 1.11 已经默认使用 Blink Planner。

    2.2 基本程序结构

    1.创建 TableEnvironment ( old/blink planner + stream/batch )
    2.创建表( tableEnv.connect 外部数据源 或者 tableEnv.fromDataStream )
    3.查询表( Table API 或者 SQL )
    4.输出表( table.insertInto("xxtable") 或者 table.toRetractStream[T]/toAppendStream[T])

    2.3 创建 TableEnvironment

    // **********************
    // FLINK STREAMING QUERY
    // **********************
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
    StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
    // or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
    
    // ******************
    // FLINK BATCH QUERY
    // ******************
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
    
    ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
    
    // **********************
    // BLINK STREAMING QUERY
    // **********************
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    
    EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
    // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
    
    // ******************
    // BLINK BATCH QUERY
    // ******************
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableEnvironment;
    
    EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
    TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
    

    2.4 创建表

    // 1.通过外部数据源创建
    //数据格式:sensor_1,1547718225,22.8
    tableEnv.connect(new Kafka()
          .version("0.11")
          .topic("sensor")
          .property("zookeeper.connect", "localhost:2181")
          .property("bootstrap.servers", "localhost:9092")
        )
          .withFormat(new Csv())
          .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("timestamp", DataTypes.BIGINT())
            .field("temperature", DataTypes.DOUBLE())
          )
          .createTemporaryTable("kafkaInputTable")
    
    
    // 2.通过 datastream 转换
    val table1: Table = tableEnv.fromDataStream(stream)
    

    2.5 查询表

    //table api
    val sensorTable = tableEnv.from("inputTable")
    val resultTable = sensorTable
    .select('id, 'temperature)
    .filter('id === "sensor_1")
    
    // SQL
    val resultSqlTable = tableEnv.sqlQuery(
    """
    |select id, temperature
    |from inputTable
    |where id = 'sensor_1'
    """.stripMargin)
    

    2.6 表转流的三种输出模式

    • 追加( Append )模式

      • 只做插入操作,和外部连接起只交换插入( insert )消息
    • 撤回 ( Retract )模式

      • 表和外部连接起交换添加( Add )和撤回( Retract )消息
      • 插入操作编码为 Add 消息,删除编码为 Retract 消息,更新编码为上一条的 Retract 和下一条的 Add 消息
      • 不能定义 Key
    • 更新( Upsert )模式

      • 更新和插入都被编码为 Upsert 消息,删除编码为 Delete 消息
      • 需要定义 Key

    DataStream 只支持 Append 和 Retract 模式。(toRetractStream[T] & toAppendStream[T] )
    外部文件系统的流支持哪种模式取决于具体实现,比如 Kakfa 只支持 Append 模式。

    2.7 输出表

    tableEnv.connect(new FileSystem().path(filePath))
          .withFormat(new Csv())
          .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("timestamp", DataTypes.BIGINT())
            .field("temp", DataTypes.DOUBLE())
          )
          .createTemporaryTable("inputTable")
    
        // 转换操作
        val sensorTable: Table = tableEnv.from("inputTable")
        // 简单转换
        val resultTable: Table = sensorTable
          .select('id, 'temp)
          .filter('id === "sensor_1")
    
        //  聚合转换
        val aggTable: Table = sensorTable
          .groupBy('id)    
          .select('id, 'id.count as 'count)
    
        // 输出到外部文件系统或者 DataStream
        val outputPath = "..."
    
        // 注册输出表
        tableEnv.connect(new FileSystem().path(outputPath))
          .withFormat(new Csv())
          .withSchema(new Schema()
            .field("id", DataTypes.STRING())
            .field("temperature", DataTypes.DOUBLE())
          )
          .createTemporaryTable("outputTable")
    
        //aggTable.insertInto("outputTable")  aggTable 因为有修改操作,CsvTableSink 只实现了 AppendStreamTableSink,所以无法输出到文件。
        resultTable.insertInto("outputTable")
    
        resultTable.toAppendStream[(String, Double)].print("result")
        // aggTable 因为有修改操作不能使用 append,需要使用 Retract
        aggTable.toRetractStream[Row].print("agg")
    

    3 动态表

    3.1 DataStream 上的关系查询

    关系代数 / SQL 流处理
    关系(或表)是有界(多)元组集合。 流是一个无限元组序列。
    对批数据(例如关系数据库中的表)执行的查询可以访问完整的输入数据。 流式查询在启动时不能访问所有数据,必须“等待”数据流入。
    批处理查询在产生固定大小的结果后终止。 流查询不断地根据接收到的记录更新其结果,并且始终不会结束。

    尽管存在这些差异,但是使用关系查询和 SQL 处理流并不是不可能的。高级关系数据库系统提供了一个称为 物化视图(Materialized Views) 的特性。物化视图被定义为一条 SQL 查询,缓存查询的结果。缓存的一个常见难题是防止缓存结果过期。当其定义查询的基表被修改时,物化视图将过期。 即时视图维护(Eager View Maintenance) 是一种一旦更新了物化视图的基表就立即更新视图的技术。

    3.2 动态表 & 连续查询( Continuous Query )

    动态表查询流程
    1. 将流转换为动态表。
    2. 在动态表上计算一个连续查询,生成一个新的动态表。
    3. 生成的动态表被转换回流。
    流转为动态表
    连续查询并生成新动态表
    动态表转换回流(Retract模式)

    4. 窗口和时间语义

    关于窗口和时间语义的介绍可以参考这篇文章。之前是在流上进行讨论的。Flink 在表上同样支持相应的逻辑。

    4.1 时间语义

    可以通过 DDL 方式创建两种时间语义,但是比较晦涩,这里不做举例,感兴趣可以到官网查看。

    4.1.1 processing time

    注意处理时间属性一定不能定义在一个已有字段上

    • 流转表时:

      // 声明一个额外的字段作为时间属性字段
      val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)
      
    • 定义 tableSchema 时:

        .withSchema(new Schema()
          .field("id",DataTypes.STRING())
          .field("timestamp",DataTypes.BIGINT())
          .field("temperature",DataTypes.DOUBLE())
          .field("pt",DataTypes.TIMESTAMP(3)).proctime()    //将该字段定义为 processing time
        )
      

    4.1.2 event time

    • 流转表时:
      // 基于 stream 中的事件产生时间戳和 watermark
      val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
      
      // 声明一个额外的逻辑字段作为事件时间属性(数据来源于上面datastream定义好的字段),必须放在 schema 最后
      val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime)
      
      
      // Option 2:
      
      // 从第一个字段获取事件时间,并且产生 watermark
      val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
      
      // 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
      val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")
      
    • 定义 tableSchema 时:
          //需要注意这种方式的 source 必须实现 DefinedRowtimeAttributes 接口。如 KafkaTableSource 实现了该接口。CsvTableSource 则没有。 
      .withSchema(new Schema()
          .field("id",DataTypes.STRING())
          .field("timestamp",DataTypes.BIGINT())
          .rowtime(
              new Rowtime()
                  .timestampsFromFiled("timestamp")
                  .watermarksPeriodicBounded(1000)
          )
          .field("temperature",DataTypes.DOUBLE())
        )
      

    4.2 窗口操作

    窗口操作相当于对数据进行分组时,除了按照字段以外,增加了新的维度进行分组,一般是时间或者数据数量。

    4.2.1 Group Windows

    根据时间或者行数间隔,将行聚集在有限的组中,并对每个组的数据执行一次聚合函数。最终每个组得出一个结果,类似于传统对 group by 操作

    // 基本使用结构
    val table = input
            .window([w:GroupWindow] as "w") //定义窗口和别名 w
            .groupBy($"w",$"a")  //以属性 a 和窗口 w 作为分组的key
            .select($"a",$"b".sum)  //聚合字段b的值,求和
    
    tumbling window
        - .window( Tumble over 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
        - .window( Tumble over 10.rows on $"a_proctime" as "w")
        - sql: tumble(ts, interval '10' second)
    
    sliding windows 
        - .window( Slide over 10.minutes every 5.minutes on $"a_rowtime"/$"a_proctime as "w")
        - .window( Slide over 10.rows every 5.rows on $"a_proctime" as "w")
        - sql: hop(ts,interval '10' second,interval '10' second) p.s. 第二个是步长,第三个是窗口长度
    
    session windows
        - .window( Session withGap 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
        - sql: session(ts,interval '10' second)
    
    sql 辅助函数,xx = {tumble,hop,session}:
        - xx_start(ts, interval '10' second)
        - xx_end(ts, interval '10' second)
        - xx_rowtime(ts, interval '10' second)
        - xx_proctime(ts, interval '10' second)
    

    4.2.2 Over Windows

    针对每个输入行,进行开窗,增加一列表示结果,每个行都有自己所在窗口的结果。类似于传统的 over 操作

    // 基本使用结构
    val table = input
            .window([w:OverWindow] as "w") 
            .select($"a",$"b".sum over $"w", $"c".min over $"w")  
    
    无界 over window
        - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_RANGE as "w")
        - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_ROW as "w")
    
    有界 over window
        - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 1.minutes as "w")
        - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 10.rows as "w")
    

    相关文章

      网友评论

          本文标题:Flink SQL 入门指北

          本文链接:https://www.haomeiwen.com/subject/rtkxvktx.html