Apache Flink 具有两个关系型API:Table API 和SQL,用于统一流和批处理。
Table API 是用于 Scala 和 Java 语言的查询API,允许以非常直观的方式组合关系运算符的查询,例如 select,filter 和 join。Flink SQL 的支持是基于实现了SQL标准的 Apache Calcite。无论输入是批输入(DataSet)还是流输入(DataStream),任一接口中指定的查询都具有相同的语义并指定相同的结果。
Table API 和 SQL 还没有完全支持并且正在积极开发中。
要使用 Table API 和SQL,需要将以下依赖引入项目:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.6.1</version>
</dependency>
Table API 和SQL
批处理和流式传输的 Table API 和SQL程序都遵循相同的模式。以下代码示例显示了常见的程序结构:
// 批处理使用 ExecutionEnvironment
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 创建 TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注册 Table
tableEnv.registerTable("table1", ...)
// Table API query
val tapiResult = tableEnv.scan("table1").select(...)
// SQL query
val sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ...")
// Sink query result
tapiResult.writeToSink(...)
// execute
env.execute()
TableEnvironment
TableEnvironment 是 Table API 和SQL集成的核心概念,它负责:
- 在内部目录中注册表
- 注册外部目录
- 执行SQL查询
- 注册用户定义的函数
- DataStream 或 DataSet 转换为 Table
- 持有 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
Table 总是与特定的 TableEnvironment 绑定。不能在同一查询中组合不同 TableEnvironments 的表(例如,union 或 join)。创建 TableEnvironment:
// STREAMING QUERY
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for streaming queries
val sTableEnv = TableEnvironment.getTableEnvironment(sEnv)
// BATCH QUERY
val bEnv = ExecutionEnvironment.getExecutionEnvironment
// create a TableEnvironment for batch queries
val bTableEnv = TableEnvironment.getTableEnvironment(bEnv)
注册 Table
TableEnvironment 维护一个按名称注册的表的目录。有两种类型的表,输入表(input table)和输出表(output table)。输入表可以在 Table API 和SQL查询中引用,并提供输入数据。输出表可用于将 Table API 或SQL查询的结果发送到外部系统。
输入表的注册源:
- Table API 或SQL查询的结果表
- 访问外部数据的 TableSource,例如文件,数据库或消息系统
- DataStream 或 DataSet。
输出表的注册源:TableSink
代码示例:
val tableEnv = TableEnvironment.getTableEnvironment(env)
// from Table API or SQL
val projTable: Table = tableEnv.scan("X").select(...)
tableEnv.registerTable("projectedTable", projTable)
// from TableSource
val csvSource: TableSource = new CsvTableSource("/path/to/file", ...)
tableEnv.registerTableSource("CsvTable", csvSource)
// from TableSink
val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
// define the field names and types
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)
注册外部目录
外部目录(external catalog)可以提供有关外部数据库和表的信息(如名称,schema,统计信息以及访问信息)。可以通过实现 ExternalCatalog 接口创建外部目录,并在 TableEnvironment 中注册:
// 获取一个 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 创建一个外部目录
val catalog: ExternalCatalog = new InMemoryExternalCatalog
// 注册外部目录
tableEnv.registerExternalCatalog("InMemCatalog", catalog)
查询
Table API
Table API 是一个 Scala 和 Java 的语言集成查询API,是基于 Table类。Table类代表了一个流或者批表,并提供方法来使用关系型操作。这些方法返回一个新的 Table 对象,这个新的 Table 对象代表着输入的 Table 应用关系型操作后的结果。下面的例子展示了一个简单的 Table API 聚合查询:
// 获取一个 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注册一个名叫 Orders 的表 ...
// 扫描注册的 Orders 表
val orders = tableEnv.scan("Orders")
// 计算所有来自法国的客户的收入
val revenue = orders
.filter('cCountry === "FRANCE")
.groupBy('cID, 'cName')
.select('cID, 'cName, 'revenue.sum AS 'revSum)
// 执行查询
SQL
Flink SQL 集成是基于 Apache Calcite,Apache Calcite 实现了标准的SQL。下面的例子展示了如何指定一个查询并返回结果:
// 获取一个 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 注册一个名叫 Orders 的表
// 计算所有来自法国的客户的收入
val revenue = tableEnv.sqlQuery("""
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// 执行查询
指定将其结果插入已注册表的更新查询:
// 注册一个名叫 RevenueFrance 的输出表
// 计算所有来自法国的客户的收入,并更新到 RevenueFrance 表
tableEnv.sqlUpdate("""
|INSERT INTO RevenueFrance
|SELECT cID, cName, SUM(revenue) AS revSum
|FROM Orders
|WHERE cCountry = 'FRANCE'
|GROUP BY cID, cName
""".stripMargin)
// 执行查询
混合使用 Table API 和SQL,Table API 和SQL查询可以很容易地合并因为它们都返回 Table 对象:
- Table API 查询可以基于SQL查询结果的 Table 来进行
- SQL查询可以基于 Table API 查询的结果来定义
输出表
要输出 Table 可以写入 TableSink。TableSink 是通用接口,支持各种文件格式(如:CSV,Apache Parquet,Apache Avro)、存储系统(如:JDBC,Apache HBase,Apache Cassandra,Elasticsearch)或消息系统(如:Apache Kafka,RabbitMQ)。
批处理 Table 只能写入 BatchTableSink,而流式处理 Table 需要 AppendStreamTableSink,RetractStreamTableSink 或 UpsertStreamTableSink。有关可用接收器的详细信息,请参阅 Sources & Sinks。
有两种方法可以发送表:
-
Table.writeToSink(TableSink sink)
自动匹配 schema -
Table.insertInto(String sinkTable)
使用特定 schema
以下示例显示如何发出Table:
// 获取一个 StreamTableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// 使用Table API和/或SQL查询获取一个 Table
val result: Table = ...
// 创建一个 TableSink
val sink: TableSink = new CsvTableSink("/path/to/file", fieldDelim = "|")
// METHOD 1:
// 将结果表写入 TableSink
result.writeToSink(sink)
// METHOD 2:
// 注册指定 schema 的 TableSink
val fieldNames: Array[String] = Array("a", "b", "c")
val fieldTypes: Array[TypeInformation] = Array(Types.INT, Types.STRING, Types.LONG)
tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, sink)
// 将结果表写入 TableSink
result.insertInto("CsvSinkTable")
// 执行程序
与 DataStream 和 DataSet API 集成
Table API 和SQL查询可以很容易地进行集成并嵌入到 DataStream 和 DataSet 程序中。例如,可以查询一个外部表(来自关系型数据库的表),做一些处理(如过滤、映射、聚合或者关联元数据),然后使用 DataStream 或者 DataSet API(以及在这些API之上构建的任何库,例如CEP或 Gelly) 进行进一步处理。
同样,Table API 或者SQL查询也可以应用于 DataStream 或者 DataSet 程序的结果中。这种交互可以通过将 DataStream 或者 DataSet 转换成一个 Table 及将 Table 转换成 DataStream 或者 DataSet 来实现。
Scala 隐式转换
Scala Table API 支持 DataSet,DataStream 以及 Table 间的隐式转换。需要引入 org.apache.flink.table.api.scala._
和 org.apache.flink.api.scala._
。
DataStream 或 DataSet 转换为 Table
DataStream 或 DataSet 可以在 TableEnvironment 中注册为表,表的 schema 根据注册的 DataStream 或 DataSet 的数据类型来定:
val stream: DataStream[(Long, String)] = ...
// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)
// register the DataStream as table "myTable2" with fields "myLong", "myString"
tableEnv.registerDataStream("myTable2", stream, 'myLong, 'myString)
也可以直接转换为表,而不需要注册:
val stream: DataStream[(Long, String)] = ...
// convert the DataStream into a Table with default fields '_1, '_2
val table1: Table = tableEnv.fromDataStream(stream)
// convert the DataStream into a Table with fields 'myLong, 'myString
val table2: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
Table 转换为 DataStream 或 DataSet
Table 可以转换为 DataStream 或者 DataSet,通过这种方式,DataStream 或者 DataSet 程序就可以基于 Table API 或者SQL查询的结果来执行了。
当将一个 Table 转换为 DataStream 或 DataSet 时,需要指定生成的 DataStream 或 DataSet 的数据类型,即需要转换表的行的数据类型,通常最方便的转换类型是 Row,下面列表描述了不同选项的功能:
- Row:字段按位置、任意数量字段映射,支持 null 值,无类型安全访问
- POJO:字段按名称(POJO 字段命名为 Table 字段)、任意数量字段映射,支持 null 值,类型安全访问
- Case Class:字段按位置映射,不支持 null 值,类型安全访问
- Tuple:字段按位置映射,不得多于22(Scala)或 25(Java)个字段,不支持 null 值,类型安全访问
- Atomic Type:Table 必须有一个字段,不支持 null 值,类型安全访问
Table 转换 DataStream
流式查询的结果表会动态地更新,每个新的记录到达输入流时结果就会发生变化。有两种模式将 Table 转换为 DataStream:
- Append Mode:只适用于当动态表仅由
INSERT
修改时,之前的结果不会被更新。 - Retract Mode:始终都可以使用此模式,使用一个 boolean 标识来编码
INSERT
和DELETE
更改。
// 有两个字段的 Table(String name, Integer age)
val table: Table = ...
// 将 Table 转换为 Row 类型的 Append DataStream
val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
// 将 Table 转换为 Tuple2<String, Integer> 类型的 Append DataStream
val dsTuple: DataStream[(String, Int)] dsTuple =
tableEnv.toAppendStream[(String, Int)](table)
// 将 Table 转换为 Row 类型的 Retact DataStream
// 一个 ReactDataStream 的类型X为表示为 DataStream[(Boolean, X)]
// boolean 字段指定了更改的类型
// True 是 INSERT, false 是 DELETE
val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)
Table 转换 DataSet
// 有两个字段的 Table(String name, Integer age)
val table: Table = ...
// 将 Table 转换为 Row 类型的 DataSet
val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
// 将 Table 转换为 Tuple2<String, Integer> 类型的 DataSet
val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)
将数据类型映射到表模式(Table schema)
DataStream 和 DataSet API 支持多种数据类型,如:Tuple、POJO、case class 及 Row 类型。
原子类型
Flink 将原生类型(Integer、Double、String...)或泛型类型视为原子类型(Atomic type)。一个原子类型的 DataStream 或 DataSet 可以转换为只有一个属性的 Table,属性的类型根据原子类型推算,并且必须指定属性的名称。
val stream: DataStream[Long] = ...
// convert DataStream into Table with default field name "f0"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field name "myLong"
val table: Table = tableEnv.fromDataStream(stream, 'myLong)
Tuple 和 Case Class
Flink 支持 Scala 原生的 Tuple 类型,也为 Java 提供了 Tuple 类。两种类型的 DataStream 和 DataSet 都可以被转换为 Table。通过为所有字段提供名称(基于位置的映射),可以重命名字段。如果未指定字段名,则使用默认字段名。基于名称的映射允许使用别名(as
)重新排序字段。
val stream: DataStream[(Long, String)] = ...
// convert DataStream into Table with renamed default field names '_1, '_2
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with field names "myLong", "myString" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myString)
// convert DataStream into Table with reordered fields "_2", "_1" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2, '_1)
// convert DataStream into Table with projected field "_2" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2)
// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
val table: Table = tableEnv.fromDataStream(stream, '_2 as 'myString, '_1 as 'myLong)
// define case class
case class Person(name: String, age: Int)
val streamCC: DataStream[Person] = ...
// convert DataStream into Table with default field names 'name, 'age
val table = tableEnv.fromDataStream(streamCC)
// convert DataStream into Table with field names 'myName, 'myAge (position-based)
val table = tableEnv.fromDataStream(streamCC, 'myName, 'myAge)
// convert DataStream into Table with reordered and aliased fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)
POJO
Flink 支持使用 POJO 作为复合类型。当将一个 POJO 类型的 DataStream 或 DataSet 转换为 Table 而不指定字段名称时,Table 的字段名称将采用 POJO 原生的字段名称。重命名原始的 POJO 字段需要关键字AS
,因为 POJO 没有固定的顺序,名称映射需要原始名称并且不能通过位置来完成。
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// Person is a POJO with field names "name" and "age"
val stream: DataStream[Person] = ...
// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'age as 'myAge, 'name as 'myName)
// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)
// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
Row
Row 类型支持任意数量的字段,并且支持 null 值。字段名称可以通过 RowTypeInfo 来指定或者将一个 Row 类型的 DataStream 或 DataSet 转换为 Table 时指定。Row 类型支持按位置和名字映射。可以通过为所有字段提供名称(基于位置)或为 映射/排序/重命名(基于名称)单独选择字段来重命名字段。
// get a TableEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)
// DataStream of Row with two fields "name" and "age" specified in `RowTypeInfo`
val stream: DataStream[Row] = ...
// convert DataStream into Table with default field names "name", "age"
val table: Table = tableEnv.fromDataStream(stream)
// convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
val table: Table = tableEnv.fromDataStream(stream, 'myName, 'myAge)
// convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName, 'age as 'myAge)
// convert DataStream into Table with projected field "name" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name)
// convert DataStream into Table with projected and renamed field "myName" (name-based)
val table: Table = tableEnv.fromDataStream(stream, 'name as 'myName)
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/table/common.html
网友评论