2019-03-09-Flink(6)——flink table

作者: 此星爷非彼星爷 | 来源:发表于2019-03-09 21:27 被阅读3次

本文转自个人微信公众号,原文链接。本博客评论系统需要梯子,大家关注下公众号方便交流。

本文基于 Flink 1.7。

随着 Hadoop 的发展,有了Hive,使用HQL 即可完成原来繁琐的Map Reduce 程序。

随着 Spark的发展,引入了 Spark SQL。

随着 Flink 版本的更迭,Flink 也提供了Flink SQL,以及 Table APIs。

注意:截止 Flink 1.7,Table API 和 SQL 还没有完成,有些操作还不支持。

1. 基本概念

1.1 Why

那么,为什么要推出Table APIs和SQL?

首先,来看下Flink 的编程模型。如下图所示(图片来源于官网),DataStream API 和 DataSet API 是分开的,但是对于应用开发者来说,为什么要关注这一点?对于相同的数据,批处理与流计算居然要写两套代码,简直不可思议。Table APIs和SQL的推出,实现了流处理和批处理的统一。 flink-abstract.png

其次,降低了学习和使用门槛,基于 DataStream/DataSet APIs 的 Scala 或 Java 程序开发,对于BI/分析师来说,还是有一定门槛的,而SQL 则简单太多了。

1.2 Dynamic Tables

Dynamic Tables 是 Flink Table API 和 SQL的核心概念,与大家熟知的Static Tables 相比,Dynamic Tables 随着时间一直在变化。可以查询Dynamic Table,查询Dynamic Table 时会产生一个持续的查询,持续的查询不会终止,产生的结果也是Dynamic Table,根据输入,输出也会不断变化。熟悉关系型数据库的可以将Dynamic Tables的查询跟关系型数据库里查询物化视图对比起来,需要注意的是,Dynamic Tables 是一个逻辑概念,不需要(全部)物化。

另外,需要注意,在Dynamic Table上的持续查询的结果语义上是跟在Dynamic Table的快照上执行查询相同。

flink_dynamic_table.png

如上图所示:

  • Stream 转化为 Dynamic Table
  • 在Dynamic Table 上执行查询,得到的结果是一个新的Dynamic Table
  • 最终的Dynamic Table 结果,被转化为 Stream

1.3 Update Queries VS Append Queries

Append Queries:只会对查询结果进行追加的查询。

Update Queries:会更新查询结果的查询,一般需要维护更多的state。

1.4 查询限制

有些 Stream 上的查询需要花费巨大的代价:

  • 需要维护的state 太大。持续查询可能会运行非常长的时间,处理的数据量会非常大,对于一些需要更新原来结果的查询,需要维护原来的结果,维护的state会很大。
  • 更新计算代价高昂:输入数据的一小点变化,可能有些查询需要重新计算大量的数据,这种计算就不适合做持续查询。

1.5 Table 到 Stream 的转化

就像普通的数据库Table 一样,Dynamic Table也支持 insertupdatedelete等对它的更新。当需要将Dynamic Table 转化为 Stream 或者输出到外部系统时,需要对这些更新进行encode

  • Append-only Stream:仅有Insert 更新的Dynamic Table,可通过emit 插入的数据行转化为stream。
  • Retract Stream:Retract Stream 是支持 add 消息 和 retract 消息两类消息的流。将insert 编码为add 消息、将delete 编码为retract 消息、将update 编码为对之前消息的retarct 消息和对新消息的add消息。
  • Upsert Stream:Upsert Stream 是支持upsert 消息和delete消息两类消息的流。如果一个Dynamic Table需要转化为一个Upsert Stream,这个Table 必须要有unique key,可以将insert/update编码为upsert 消息,将delete 编码为delete消息。Upsert Stream 与 Retract Stream的主要区别是update 操作只需要一条消息,所以会更高效。

Append-only Stream 和 Retarct Stream 支持将Dynamic Table 转化为DataStream。

2. 实战

下面引入一个简单的例子,从stream开始,转化为 Table,然后查询Table,最后将Table 转化为Stream。

从例子可以很容易的看出,Stream 和 Table APIs / SQL 可以很容易的混用,这也给我们带来了极大的便利性。

2.1 引入依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <!-- for batch query -->
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!-- 上线时用provided,避免build的jar包太大,更避免冲突 -->
            <!--<scope>provided</scope>-->
            <!-- IDEA 里用compile,否则in-ide execution会失败 -->
            <scope>compile</scope>
        </dependency>
        <dependency>
            <!-- for streaming query -->
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <!-- 上线时用provided,避免build的jar包太大,更避免冲突 -->
            <!--<scope>provided</scope>-->
            <!-- IDEA 里用compile,否则in-ide execution会失败 -->
            <scope>compile</scope>
        </dependency>

2.2 隐式转换

Flink 的 Scala Table APIs用了隐式转换,所以,需要import 进来。

    import org.apache.flink.table.api.scala._
    import org.apache.flink.api.scala._

2.3 创建 TableEnvironment

TableEnvironment 是 Table APIs 和 SQL 的核心,可以用于:

  • 注册Table
  • 执行SQL 查询
  • 注册UDF
  • 将DataStream / DataSet 转化为 Table
  • 维护一个到ExecutionEnvironment或StreamExecutionEnvironment的引用。

Table 总是绑定到一个 TableEnvironment的,在使用时,在同一个SQL中不能联合使用不同TableEnvironment的表,比如joinunion

下面创建一个用于Stream的StreamTableEnvironment。另外,创建一个简单的stream。

    // 创建StreamTableEnvironment
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    val stableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(senv)

    // 创建一个用于实验的 Stream[ObjPrice]
    case class ObjPrice (name: String, price: Long)
    val stream: DataStream[ObjPrice] = senv.fromCollection(List(ObjPrice("car", 100000), ObjPrice("house", 2000000), ObjPrice("book", 100), ObjPrice("car", 900210)))

2.4 将Stream 转化为 Table

    val sTable1Rename: Table = stableEnv.fromDataStream(stream, 'myName, 'myPrice)

将上面的stream 转化为 Table,同时对字段进行重命名。

2.5 查询 Table

    // 采用Table API 的方式进行查询
    val sTableResult: Table = sTable1Rename
      .filter('myPrice > 1000)
      .groupBy('myName)
      .select('myName, 'myPrice.sum as 'mySumPrice)

2.6 将 Table 转化为 Stream

    val sResultDataStream: DataStream[(Boolean, ObjPrice)] = stableEnv.toRetractStream[ObjPrice](sTableResult)

3. 总结

本文仅涉及一些基础知识和最常见的使用,其他的比如注册 Table / TableSink / TableSource / External Catalog 、数据类型与Table Schema的映射、查询优化等并不涉及,可以参考官网 进行查阅。

为了方便交流,请扫描下方二维码关注我。


wxqr.jpg

相关文章

网友评论

    本文标题:2019-03-09-Flink(6)——flink table

    本文链接:https://www.haomeiwen.com/subject/pogepqtx.html