美文网首页玩转大数据
Flink SQL 使用入门

Flink SQL 使用入门

作者: AlienPaul | 来源:发表于2020-04-03 11:11 被阅读0次

简介

比起繁杂的流处理和批处理Java API,Flink支持统一的SQL方式来操作流或者批数据,甚至非开发人员都可以编写业务逻辑,极大的简化了开发过程。

引入依赖

注意:需要提供变量scala.binary.versionflink.version的值。本文中代码以Flink 1.10为准。

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

如果我们需要用到Kafka数据源,需要增加:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

如果我们使用json格式接收和发送数据,需要添加以下依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>

创建tableEnv

TableEnv是Flink所有Table操作的基础。TableEnv主要的功能有:

  • 注册数据源和输出端
  • 创建table
  • 执行SQL查询
  • 创建user defined functions(UDF)
  • 在Table和DataStream,DataSet之间转换

TableEnv根据流处理和批处理这两个场景,分为StreamExecutionEnvironmentBatchTableEnvironment。本文主要以流处理场景为中心。

创建StreamExecutionEnvironment的方法:

val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env)

使用代码形式从Kafka数据源端或输出端创建table

Flink SQL将各种数据源统一为table的概念。同样,数据输出端也被认为是table。接下来用户可以通过select语句对数据源table进行数据处理操作,最后使用insert语句将处理后的数据加入到输出端table。

使用代码从数据源端或输出端创建table的方式如下:

tableEnv
  .connect(...)
  .withFormat(...)
  .withSchema(...).createTemporaryTable(...)

其中:

  • connect:指定数据源端或输出端的具体配置,比如Kafka或者FileSystem的配置
  • withFormat:指定数据的格式,可以使用Json,Csv或Avro等
  • withSchema:指定数据的字段信息,比如每个字段的名称和数据类型
  • createTemporaryTable:创建临时表,需要指定一个临时表名

创建外部数据源Flink官网的参考链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html

下面以最常用的Kafka数据源为例说明下它的创建方法。

注意:必须在maven中引入flink-connector-kafka,否则会提示Kafka类找不到。

相关代码和解释如下:

val kafka = new Kafka()
  .version("universal") // 指定Kafka的版本,可选参数包含"0.8", "0.9", "0.10", "0.11", 和 "universal","universal"为通用版本
  .property("zookeeper.connect", "192.168.100.1:2181") // 指定Kafka集群关联的zookeeper集群的地址
  .property("bootstrap.servers", "192.168.100.1:9092") // 指定Kafka broker的地址
  .property("group.id", "testGroup") // 指定Kafka消费者的group id
  .topic("history") // 指定消费的topic的名字

指定数据结构(schema)的方式:

val schema = new Schema()
  .field("username", "string")
  .field("password", "string")

通过field方法指定各个字段的名称和数据类型。

可以通过Schemarowtime()或者proctime()方法来指定使用event time或者是processing time。
其中rowtime方法接收一个Rowtime类型数据。Rowtime的方法主要分为两类:指定timestamp字段和watermark的生成方式。

Rowtime的方法如下:

  • timestampsFromField:指定某个字段为数据的event time
  • timestampsFromSource:使用数据源的自带的timestamp提取器产生的event time
  • timestampsFromExtractor:使用自定义的timestamp提取器生成event time
  • watermarksPeriodicAscending:指定watermark生成策略为递增
  • watermarksPeriodicBounded:指定watermark允许一定程度的数据乱序,最大容忍时间为该方法的参数delay
  • watermarksFromSource:使用数据源生成的watermark
  • watermarksFromStrategy:使用自定义的watermark生成策略

一个完整的配置实例:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val tableEnv = StreamTableEnvironment.create(env)

val kafka = new Kafka()
  .version("universal")
  .property("zookeeper.connect", "192.168.100.128:2181")
  .property("bootstrap.servers", "192.168.100.128:9092")
  .property("group.id", "testGroup")
  .topic("history")

val schema = new Schema()
  .field("username", "string")
  .field("password", "string")
  .field("user_id", "string")

tableEnv
  .connect(kafka)
  .withFormat(new Json().failOnMissingField(false))
  .withSchema(schema).createTemporaryTable("user_table")

使用DDL语句的方式从数据源创建table

在这一部分Flink通过SQLcreate table语句的方式,创建出对应的table。

以Kafka数据源端或输出端为例,Flink官网对它详细配置的解释如下:

CREATE TABLE MyUserTable (
  ...
) WITH (
  'connector.type' = 'kafka',       

  'connector.version' = '0.11',     -- required: valid connector versions are
                                    -- "0.8", "0.9", "0.10", "0.11", and "universal"

  'connector.topic' = 'topic_name', -- required: topic name from which the table is read

  'connector.properties.zookeeper.connect' = 'localhost:2181', -- required: specify the ZooKeeper connection string
  'connector.properties.bootstrap.servers' = 'localhost:9092', -- required: specify the Kafka server connection string
  'connector.properties.group.id' = 'testGroup', --optional: required in Kafka consumer, specify consumer group
  'connector.startup-mode' = 'earliest-offset',    -- optional: valid modes are "earliest-offset", 
                                                   -- "latest-offset", "group-offsets", 
                                                   -- or "specific-offsets"

  -- optional: used in case of startup mode with specific offsets
  'connector.specific-offsets' = 'partition:0,offset:42;partition:1,offset:300',

  'connector.sink-partitioner' = '...',  -- optional: output partitioning from Flink's partitions 
                                         -- into Kafka's partitions valid are "fixed" 
                                         -- (each Flink partition ends up in at most one Kafka partition),
                                         -- "round-robin" (a Flink partition is distributed to 
                                         -- Kafka partitions round-robin)
                                         -- "custom" (use a custom FlinkKafkaPartitioner subclass)
  -- optional: used in case of sink partitioner custom
  'connector.sink-partitioner-class' = 'org.mycompany.MyPartitioner',
  
  'format.type' = '...',                 -- required: Kafka connector requires to specify a format,
  ...                                    -- the supported formats are 'csv', 'json' and 'avro'.
                                         -- Please refer to Table Formats section for more details.
)

下面以一个实际的例子说明下具体的用法:

tableEnv.sqlUpdate(
  """
    |create table user_table (
    |`username` string,
    |`password` string,
    |`user_id` string
    |) with (
    |'connector.type' = 'kafka',
    |'connector.version' = 'universal',
    |'connector.topic' = 'history',
    |'connector.properties.zookeeper.connect' = '192.168.100.128:2181',
    |'connector.properties.bootstrap.servers' = '192.168.100.128:9092',
    |'format.type' = 'json'
    |)
    |""".stripMargin)

with子句内容为连接器的配置信息。解释如下:

  • connector.type:指定连接器的类型为kafka
  • connector.version:指定连接器的版本。其中universal为通用版本
  • connector.topic:指定连接kafka的topic名称
  • connector.properties.zookeeper.connect:指定Kafka集群zookeeper的地址
  • connector.properties.bootstrap.servers:执行Kafka bootstrap server的地址
  • format.type:指定数据的格式

综上所述,通过这个语句我们把Kafka映射成为user_table这个表。

使用SQL操作数据并输出

使用Table API的方式处理数据并输出。

// 执行select语句
val resultTable = tableEnv.sqlQuery(s"select username, id from user_table")
// 将resultTable的数据插入到result_table表,数据会流向result_table对应的外部系统
resultTable.InsertInto("result_table")

除此之外还有一种方式:使用insert语句输出数据。

tableEnv.sqlUpdate("insert into result_table select username, id from user_table")

参考文献

Flink SQL 连接外部系统:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html

Flink SQL create 语句:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html

相关文章

网友评论

    本文标题:Flink SQL 使用入门

    本文链接:https://www.haomeiwen.com/subject/lhthphtx.html