美文网首页
Flink SQL 入门指北

Flink SQL 入门指北

作者: xiaoc024 | 来源:发表于2020-11-02 20:29 被阅读0次

1. Overview

本文主要来自官网,旨在整理处 Flink SQL 的基本语法和使用,基础向。

2. API 调用

2.1 Old Planner VS Blink Planner

  • Blink Planner 对代码生成机制做了改进、对部分算子进行了优化,提供了丰富实用的新功能,如维表 join、Top N、MiniBatch、流式去重、聚合场景的数据倾斜优化等新功能。

  • Blink Planner 的优化策略是基于公共子图的优化算法,包含了基于成本的优化(CBO)和基于规则的优化(CRO)两种策略,优化更为全面。同时,Blink Planner 支持从 catalog 中获取数据源的统计信息,这对CBO优化非常重要。

  • Blink Planner 提供了更多的内置函数,更标准的 SQL 支持,在 Flink 1.9 版本中已经完整支持 TPC-H ,对高阶的 TPC-DS 支持也计划在下一个版本实现。

Flink 1.11 已经默认使用 Blink Planner。

2.2 基本程序结构

1.创建 TableEnvironment ( old/blink planner + stream/batch )
2.创建表( tableEnv.connect 外部数据源 或者 tableEnv.fromDataStream )
3.查询表( Table API 或者 SQL )
4.输出表( table.insertInto("xxtable") 或者 table.toRetractStream[T]/toAppendStream[T])

2.3 创建 TableEnvironment

// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);

// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

2.4 创建表

// 1.通过外部数据源创建
//数据格式:sensor_1,1547718225,22.8
tableEnv.connect(new Kafka()
      .version("0.11")
      .topic("sensor")
      .property("zookeeper.connect", "localhost:2181")
      .property("bootstrap.servers", "localhost:9092")
    )
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("kafkaInputTable")


// 2.通过 datastream 转换
val table1: Table = tableEnv.fromDataStream(stream)

2.5 查询表

//table api
val sensorTable = tableEnv.from("inputTable")
val resultTable = sensorTable
.select('id, 'temperature)
.filter('id === "sensor_1")

// SQL
val resultSqlTable = tableEnv.sqlQuery(
"""
|select id, temperature
|from inputTable
|where id = 'sensor_1'
""".stripMargin)

2.6 表转流的三种输出模式

  • 追加( Append )模式

    • 只做插入操作,和外部连接起只交换插入( insert )消息
  • 撤回 ( Retract )模式

    • 表和外部连接起交换添加( Add )和撤回( Retract )消息
    • 插入操作编码为 Add 消息,删除编码为 Retract 消息,更新编码为上一条的 Retract 和下一条的 Add 消息
    • 不能定义 Key
  • 更新( Upsert )模式

    • 更新和插入都被编码为 Upsert 消息,删除编码为 Delete 消息
    • 需要定义 Key

DataStream 只支持 Append 和 Retract 模式。(toRetractStream[T] & toAppendStream[T] )
外部文件系统的流支持哪种模式取决于具体实现,比如 Kakfa 只支持 Append 模式。

2.7 输出表

tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temp", DataTypes.DOUBLE())
      )
      .createTemporaryTable("inputTable")

    // 转换操作
    val sensorTable: Table = tableEnv.from("inputTable")
    // 简单转换
    val resultTable: Table = sensorTable
      .select('id, 'temp)
      .filter('id === "sensor_1")

    //  聚合转换
    val aggTable: Table = sensorTable
      .groupBy('id)    
      .select('id, 'id.count as 'count)

    // 输出到外部文件系统或者 DataStream
    val outputPath = "..."

    // 注册输出表
    tableEnv.connect(new FileSystem().path(outputPath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("temperature", DataTypes.DOUBLE())
      )
      .createTemporaryTable("outputTable")

    //aggTable.insertInto("outputTable")  aggTable 因为有修改操作,CsvTableSink 只实现了 AppendStreamTableSink,所以无法输出到文件。
    resultTable.insertInto("outputTable")

    resultTable.toAppendStream[(String, Double)].print("result")
    // aggTable 因为有修改操作不能使用 append,需要使用 Retract
    aggTable.toRetractStream[Row].print("agg")

3 动态表

3.1 DataStream 上的关系查询

关系代数 / SQL 流处理
关系(或表)是有界(多)元组集合。 流是一个无限元组序列。
对批数据(例如关系数据库中的表)执行的查询可以访问完整的输入数据。 流式查询在启动时不能访问所有数据,必须“等待”数据流入。
批处理查询在产生固定大小的结果后终止。 流查询不断地根据接收到的记录更新其结果,并且始终不会结束。

尽管存在这些差异,但是使用关系查询和 SQL 处理流并不是不可能的。高级关系数据库系统提供了一个称为 物化视图(Materialized Views) 的特性。物化视图被定义为一条 SQL 查询,缓存查询的结果。缓存的一个常见难题是防止缓存结果过期。当其定义查询的基表被修改时,物化视图将过期。 即时视图维护(Eager View Maintenance) 是一种一旦更新了物化视图的基表就立即更新视图的技术。

3.2 动态表 & 连续查询( Continuous Query )

动态表查询流程
  1. 将流转换为动态表。
  2. 在动态表上计算一个连续查询,生成一个新的动态表。
  3. 生成的动态表被转换回流。
流转为动态表
连续查询并生成新动态表
动态表转换回流(Retract模式)

4. 窗口和时间语义

关于窗口和时间语义的介绍可以参考这篇文章。之前是在流上进行讨论的。Flink 在表上同样支持相应的逻辑。

4.1 时间语义

可以通过 DDL 方式创建两种时间语义,但是比较晦涩,这里不做举例,感兴趣可以到官网查看。

4.1.1 processing time

注意处理时间属性一定不能定义在一个已有字段上

  • 流转表时:

    // 声明一个额外的字段作为时间属性字段
    val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name", $"data", $"user_action_time".proctime)
    
  • 定义 tableSchema 时:

      .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("timestamp",DataTypes.BIGINT())
        .field("temperature",DataTypes.DOUBLE())
        .field("pt",DataTypes.TIMESTAMP(3)).proctime()    //将该字段定义为 processing time
      )
    

4.1.2 event time

  • 流转表时:
    // 基于 stream 中的事件产生时间戳和 watermark
    val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)
    
    // 声明一个额外的逻辑字段作为事件时间属性(数据来源于上面datastream定义好的字段),必须放在 schema 最后
    val table = tEnv.fromDataStream(stream, $"user_name", $"data", $"user_action_time".rowtime)
    
    
    // Option 2:
    
    // 从第一个字段获取事件时间,并且产生 watermark
    val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)
    
    // 第一个字段已经用作事件时间抽取了,不用再用一个新字段来表示事件时间了
    val table = tEnv.fromDataStream(stream, $"user_action_time".rowtime, $"user_name", $"data")
    
  • 定义 tableSchema 时:
        //需要注意这种方式的 source 必须实现 DefinedRowtimeAttributes 接口。如 KafkaTableSource 实现了该接口。CsvTableSource 则没有。 
    .withSchema(new Schema()
        .field("id",DataTypes.STRING())
        .field("timestamp",DataTypes.BIGINT())
        .rowtime(
            new Rowtime()
                .timestampsFromFiled("timestamp")
                .watermarksPeriodicBounded(1000)
        )
        .field("temperature",DataTypes.DOUBLE())
      )
    

4.2 窗口操作

窗口操作相当于对数据进行分组时,除了按照字段以外,增加了新的维度进行分组,一般是时间或者数据数量。

4.2.1 Group Windows

根据时间或者行数间隔,将行聚集在有限的组中,并对每个组的数据执行一次聚合函数。最终每个组得出一个结果,类似于传统对 group by 操作

// 基本使用结构
val table = input
        .window([w:GroupWindow] as "w") //定义窗口和别名 w
        .groupBy($"w",$"a")  //以属性 a 和窗口 w 作为分组的key
        .select($"a",$"b".sum)  //聚合字段b的值,求和

tumbling window
    - .window( Tumble over 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
    - .window( Tumble over 10.rows on $"a_proctime" as "w")
    - sql: tumble(ts, interval '10' second)

sliding windows 
    - .window( Slide over 10.minutes every 5.minutes on $"a_rowtime"/$"a_proctime as "w")
    - .window( Slide over 10.rows every 5.rows on $"a_proctime" as "w")
    - sql: hop(ts,interval '10' second,interval '10' second) p.s. 第二个是步长,第三个是窗口长度

session windows
    - .window( Session withGap 10.minutes on $"a_rowtime"/$"a_proctime" as "w")
    - sql: session(ts,interval '10' second)

sql 辅助函数,xx = {tumble,hop,session}:
    - xx_start(ts, interval '10' second)
    - xx_end(ts, interval '10' second)
    - xx_rowtime(ts, interval '10' second)
    - xx_proctime(ts, interval '10' second)

4.2.2 Over Windows

针对每个输入行,进行开窗,增加一列表示结果,每个行都有自己所在窗口的结果。类似于传统的 over 操作

// 基本使用结构
val table = input
        .window([w:OverWindow] as "w") 
        .select($"a",$"b".sum over $"w", $"c".min over $"w")  

无界 over window
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_RANGE as "w")
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding UNBOUNDEN_ROW as "w")

有界 over window
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 1.minutes as "w")
    - .window(Over partitionBy $"a" orderBy $"rowtime/proctime" preceding 10.rows as "w")

相关文章

网友评论

      本文标题:Flink SQL 入门指北

      本文链接:https://www.haomeiwen.com/subject/rtkxvktx.html