SQL-Client 就是一个便于大家学习和写 demo 的一个 Flink-SQL 工具,这次文章的基本流程根据ververica的sql-training和社区云邪公开课整理的。
前提准备
Docker 安装,此处省略
从 GitHub 上 clone ververica/sql-training 的 Docker 镜像
地址为:https://github.com/ververica/sql-training 包含如下镜像:
执行 docker-compose up -d加载镜像
执行 docker-compose exec sql-client ./sql-client.sh启动 sql-client看到松鼠就代表跑起来了!
执行 docker-compose down终止
具体实践
执行show tables;
我们先看一下自带了这几张表
如果你想自己自定义表需要到 sql-training/build-image/training-config.yaml下去自定义表结构,具体操作可以参照官方的文档,这次主要用到 Rides这张表
介绍下training-config.yaml里的结构,sql-client 目前是提供了配置的方式定义表结构,如下:
################################################################################# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.################################################################################# This file defines the default environment for Flink's SQL Client.# Defaults might be overwritten by a session specific environment.#==============================================================================# Table Sources#==============================================================================# Define table sources here. See the Table API & SQL documentation for details.tables: - name: Rides type: sourceupdate-mode: appendschema:-name: rideIdtype:LONG-name: taxiIdtype:LONG-name: isStarttype:BOOLEAN-name: lontype:FLOAT-name: lattype:FLOAT-name: rideTimetype:TIMESTAMP rowtime: timestamps:type:"from-field"from:"eventTime" watermarks:type:"periodic-bounded"delay:"60000"-name: psgCnttype:INT connector:property-version:1type: kafkaversion:0.11 topic: Ridesstartup-mode: earliest-offset properties:-key: zookeeper.connectvalue: ${ZOOKEEPER}:2181-key: bootstrap.serversvalue: ${KAFKA}:9092-key: group.idvalue: testGroupformat:property-version:1type:jsonschema:"ROW(rideId LONG, isStart BOOLEAN, eventTime TIMESTAMP, lon FLOAT, lat FLOAT, psgCnt INT, taxiId LONG)"-name: Farestype:sourceupdate-mode: appendschema:-name: rideIdtype:LONG-name: payTimetype:TIMESTAMP rowtime: timestamps:type:"from-field"from:"eventTime" watermarks:type:"periodic-bounded"delay:"60000"-name: payMethodtype:STRING-name: tiptype:FLOAT-name: tolltype:FLOAT-name: faretype:FLOAT connector:property-version:1type: kafkaversion:0.11 topic: Faresstartup-mode: earliest-offset properties:-key: zookeeper.connectvalue: ${ZOOKEEPER}:2181-key: bootstrap.serversvalue: ${KAFKA}:9092-key: group.idvalue: testGroupformat:property-version:1type:jsonschema:"ROW(rideId LONG, eventTime TIMESTAMP, payMethod STRING, tip FLOAT, toll FLOAT, fare FLOAT)"-name: DriverChangestype:sourceupdate-mode: appendschema:-name: taxiIdtype:LONG-name: driverIdtype:LONG-name: usageStartTimetype:TIMESTAMP rowtime: timestamps:type:"from-field"from:"eventTime" watermarks:type:"periodic-bounded"delay:"60000" connector:property-version:1type: kafkaversion:0.11 topic: DriverChangesstartup-mode: earliest-offset properties:-key: zookeeper.connectvalue: ${ZOOKEEPER}:2181-key: bootstrap.serversvalue: ${KAFKA}:9092-key: group.idvalue: testGroupformat:property-version:1type:jsonschema:"ROW(eventTime TIMESTAMP, taxiId LONG, driverId LONG)"-name: Driverstype: temporal-tablehistory-table: DriverChangesprimary-key: taxiIdtime-attribute: usageStartTime-name: Sink_TenMinPsgCntstype: sinkupdate-mode: appendschema:-name: cntStarttype:TIMESTAMP-name: cntEndtype:TIMESTAMP-name: cnttype:LONG connector:property-version:1type: kafkaversion:0.11 topic: TenMinPsgCntsstartup-mode: earliest-offset properties:-key: zookeeper.connectvalue: zookeeper:2181-key: bootstrap.serversvalue: kafka:9092-key: group.idvalue: trainingGroupformat:property-version:1type:jsonschema:"ROW(cntStart TIMESTAMP, cntEnd TIMESTAMP, cnt LONG)"-name: Sink_AreaCntstype: sinkupdate-mode:upsertschema:-name: areaIdtype:INT-name: cnttype:LONG connector:type: elasticsearchversion:6hosts:- hostname:"elasticsearch"port:9200protocol:"http"index:"area-cnts"document-type:"areacnt"key-delimiter:"$"format:property-version:1type:jsonschema:"ROW(areaId INT, cnt LONG)"functions:-name:timeDifffrom:classclass: com.dataartisans.udfs.TimeDiff-name: isInNYCfrom:classclass: com.dataartisans.udfs.IsInNYC-name: toAreaIdfrom:classclass: com.dataartisans.udfs.ToAreaId-name: toCoordsfrom:classclass: com.dataartisans.udfs.ToCoords#==============================================================================# Execution properties#==============================================================================# Execution properties allow for changing the behavior of a table program.execution:type: streaming# 'batch' or 'streaming' executionresult-mode:table# 'changelog' or 'table' presentation of resultsparallelism:1# parallelism of the programmax-parallelism:128# maximum parallelismmin-idle-state-retention:0# minimum idle state retention in msmax-idle-state-retention:0# maximum idle state retention in ms#==============================================================================# Deployment properties#==============================================================================# Deployment properties allow for describing the cluster to which table# programs are submitted to.deployment:type:standalone# only the 'standalone' deployment is supportedresponse-timeout:5000# general cluster communication timeout in msgateway-address:""# (optional) address from cluster to gatewaygateway-port:0# (optional) port from cluster to gateway
type:声明是一个 source 还是 sink。
update-mode:表里面的数据是什么行为 append 模式 update 模式
schema:表结构字段的定义。
具体介绍一下rideTime事件发生的时间,需要基于这个时间来做一些窗口的操作,因此要把这个字段声明为rowtime字段添加watermark,watermark是Flink里的时间机制,之后的文章再做详细介绍。
connector:主要定义连接配置,像Kafka,Elasticsearch等。
format:定义如何去解析定义的格式。
执行select * from Rides;看一下 Rides表数据
这时候打开 http://localhost:8081/ (Flink Web)可以发现刚才的 SQL 任务已经跑起来了
需求 1(filter)
统计出现在纽约的行车记录。这里我们需要进行一个过滤的操作,我们需要有个自定义的 UDF ,具体思路是,表里面有经度和维度这两个字段,通过这个可以来开发一个是否在纽约的 UDF。(这里官方 Demo 里已经帮我们写好了)代码都在 sql-training/build-image/sql-udfs下的IsInNYC,继承 ScalarFunction类实现 eval方法。
然后我们执行 mvn clean package得到 UDF 的 jar 包,然后把它扔到 sql-client 的 lib下,然后再去 training-config.yaml下去配置一下 functions
官方的 Demo 已经全帮我们做好了,我们去 sql-client 里查看一下
已经有了 isInNYC的 UDF,这样我们就可以用这个 UDF 来筛选出在纽约的行车记录。执行 select * from Rides where isInNYC(lon,lat);
结果如下:
跑出的数据就都是纽约的行车记录了~
需求 2(Group Agg)
计算搭载每种乘客数量的行车事件数,完成无限流上的聚合操作。也就是搭载1个乘客的行车数,搭载2个乘客的行车数。
SELECT psgCnt,COUNT(*)ascntFROMRidesGROUPBYpsgCnt;
结果如下:
需求 3 (Window Agg)
为了持续地监测城市的交通流量,计算每个区域每 5 分钟的进入的车辆数。我们只关心纽约的区域交通情况,这里我们还需要区域相关的 UDF ToAreaId
SELECTtoAreaId(lon,lat)ASarea,TUMBLE_END(rideTime,INTERVAL'5'MINUTE)ASwindow_end,COUNT(*)AScntFROMRidesWHEREisInNYC(lon,lat)GROUPBY toAreaId(lon,lat), isStart,TUMBLE(rideTime,INTERVAL'5'MINUTE)HAVINGCOUNT(*) >=5;
运行结果如下:
需求 4(write to Kafka)
将10分钟的搭乘的乘客数写入 Kafka
结果表:Sink_TenMinPsgCnts具体的表结构定义如下:
SQL 如下(因为Sink_TenMinPsgCnts表的 cnt 是 Long 类型需要将 sum 的 cnt 也进行一下转换):
INSERTINTOSink_TenMinPsgCntsSELECTTUMBLE_START(rideTime,INTERVAL'10'MINUTE)AScntStart,TUMBLE_END(rideTime,INTERVAL'10'MINUTE)AScntEnd,CAST(SUM(psgCnt)ASBIGINT)AScntFROMRidesGROUPBYTUMBLE(rideTime,INTERVAL'10'MINUTE);
执行
docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092--topic TenMinPsgCnts --from-beginning
可以监听到写入到 Kafka 中的数据,结果如下:
需求 5(write to ES)
从每个区域出发的行车数,写入到 ES
结果表:Sink_AreaCnts具体的表结构定义如下:
SQL 如下:
INSERTINTOSink_AreaCntsSELECTtoAreaId(lon, lat)ASareaId,COUNT(*)AScntFROMRidesGROUP BY toAreaId(lon, lat);
之后访问 http://localhost:9200/area-cnts 可以发现index已经创建了
可以通过 ES 简单的做一下查询 http://localhost:9200/area-cnts/_search?q=areaId:49791
网友评论