简介
比起繁杂的流处理和批处理Java API,Flink支持统一的SQL方式来操作流或者批数据,甚至非开发人员都可以编写业务逻辑,极大的简化了开发过程。
引入依赖
注意:需要提供变量scala.binary.version
和flink.version
的值。本文中代码以Flink 1.10为准。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
如果我们需要用到Kafka数据源,需要增加:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
如果我们使用json格式接收和发送数据,需要添加以下依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
创建tableEnv
TableEnv是Flink所有Table操作的基础。TableEnv主要的功能有:
- 注册数据源和输出端
- 创建table
- 执行SQL查询
- 创建user defined functions(UDF)
- 在Table和DataStream,DataSet之间转换
TableEnv根据流处理和批处理这两个场景,分为StreamExecutionEnvironment
和BatchTableEnvironment
。本文主要以流处理场景为中心。
创建StreamExecutionEnvironment
的方法:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
使用代码形式从Kafka数据源端或输出端创建table
Flink SQL将各种数据源统一为table的概念。同样,数据输出端也被认为是table。接下来用户可以通过select语句对数据源table进行数据处理操作,最后使用insert语句将处理后的数据加入到输出端table。
使用代码从数据源端或输出端创建table的方式如下:
tableEnv
.connect(...)
.withFormat(...)
.withSchema(...).createTemporaryTable(...)
其中:
- connect:指定数据源端或输出端的具体配置,比如Kafka或者FileSystem的配置
- withFormat:指定数据的格式,可以使用Json,Csv或Avro等
- withSchema:指定数据的字段信息,比如每个字段的名称和数据类型
- createTemporaryTable:创建临时表,需要指定一个临时表名
创建外部数据源Flink官网的参考链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
下面以最常用的Kafka数据源为例说明下它的创建方法。
注意:必须在maven中引入flink-connector-kafka
,否则会提示Kafka
类找不到。
相关代码和解释如下:
val kafka = new Kafka()
.version("universal") // 指定Kafka的版本,可选参数包含"0.8", "0.9", "0.10", "0.11", 和 "universal","universal"为通用版本
.property("zookeeper.connect", "192.168.100.1:2181") // 指定Kafka集群关联的zookeeper集群的地址
.property("bootstrap.servers", "192.168.100.1:9092") // 指定Kafka broker的地址
.property("group.id", "testGroup") // 指定Kafka消费者的group id
.topic("history") // 指定消费的topic的名字
指定数据结构(schema)的方式:
val schema = new Schema()
.field("username", "string")
.field("password", "string")
通过field方法指定各个字段的名称和数据类型。
可以通过Schema
的rowtime()
或者proctime()
方法来指定使用event time或者是processing time。
其中rowtime方法接收一个Rowtime
类型数据。Rowtime
的方法主要分为两类:指定timestamp字段和watermark的生成方式。
Rowtime
的方法如下:
- timestampsFromField:指定某个字段为数据的event time
- timestampsFromSource:使用数据源的自带的timestamp提取器产生的event time
- timestampsFromExtractor:使用自定义的timestamp提取器生成event time
- watermarksPeriodicAscending:指定watermark生成策略为递增
- watermarksPeriodicBounded:指定watermark允许一定程度的数据乱序,最大容忍时间为该方法的参数delay
- watermarksFromSource:使用数据源生成的watermark
- watermarksFromStrategy:使用自定义的watermark生成策略
一个完整的配置实例:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)
val kafka = new Kafka()
.version("universal")
.property("zookeeper.connect", "192.168.100.128:2181")
.property("bootstrap.servers", "192.168.100.128:9092")
.property("group.id", "testGroup")
.topic("history")
val schema = new Schema()
.field("username", "string")
.field("password", "string")
.field("user_id", "string")
tableEnv
.connect(kafka)
.withFormat(new Json().failOnMissingField(false))
.withSchema(schema).createTemporaryTable("user_table")
使用DDL语句的方式从数据源创建table
在这一部分Flink通过SQLcreate table
语句的方式,创建出对应的table。
以Kafka数据源端或输出端为例,Flink官网对它详细配置的解释如下:
CREATE TABLE MyUserTable (
...
) WITH (
'connector.type' = 'kafka',
'connector.version' = '0.11', -- required: valid connector versions are
-- "0.8", "0.9", "0.10", "0.11", and "universal"
'connector.topic' = 'topic_name', -- required: topic name from which the table is read
'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: specify the ZooKeeper connection string
'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: specify the Kafka server connection string
'connector.properties.group.id' = 'testGroup', --optional: required in Kafka consumer, specify consumer group
'connector.startup-mode' = 'earliest-offset', -- optional: valid modes are "earliest-offset",
-- "latest-offset", "group-offsets",
-- or "specific-offsets"
-- optional: used in case of startup mode with specific offsets
'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',
'connector.sink-partitioner' = '...', -- optional: output partitioning from Flink's partitions
-- into Kafka's partitions valid are "fixed"
-- (each Flink partition ends up in at most one Kafka partition),
-- "round-robin" (a Flink partition is distributed to
-- Kafka partitions round-robin)
-- "custom" (use a custom FlinkKafkaPartitioner subclass)
-- optional: used in case of sink partitioner custom
'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
'format.type' = '...', -- required: Kafka connector requires to specify a format,
... -- the supported formats are 'csv', 'json' and 'avro'.
-- Please refer to Table Formats section for more details.
)
下面以一个实际的例子说明下具体的用法:
tableEnv.sqlUpdate(
"""
|create table user_table (
|`username` string,
|`password` string,
|`user_id` string
|) with (
|'connector.type' = 'kafka',
|'connector.version' = 'universal',
|'connector.topic' = 'history',
|'connector.properties.zookeeper.connect' = '192.168.100.128:2181',
|'connector.properties.bootstrap.servers' = '192.168.100.128:9092',
|'format.type' = 'json'
|)
|""".stripMargin)
with子句内容为连接器的配置信息。解释如下:
- connector.type:指定连接器的类型为kafka
- connector.version:指定连接器的版本。其中universal为通用版本
- connector.topic:指定连接kafka的topic名称
- connector.properties.zookeeper.connect:指定Kafka集群zookeeper的地址
- connector.properties.bootstrap.servers:执行Kafka bootstrap server的地址
- format.type:指定数据的格式
综上所述,通过这个语句我们把Kafka映射成为user_table
这个表。
使用SQL操作数据并输出
使用Table API的方式处理数据并输出。
// 执行select语句
val resultTable = tableEnv.sqlQuery(s"select username, id from user_table")
// 将resultTable的数据插入到result_table表,数据会流向result_table对应的外部系统
resultTable.InsertInto("result_table")
除此之外还有一种方式:使用insert语句输出数据。
tableEnv.sqlUpdate("insert into result_table select username, id from user_table")
参考文献
Flink SQL 连接外部系统:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html
Flink SQL create 语句:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
网友评论