Flink SQL在线调试功能能够帮助用户数据探索与逻辑校验,调试功能要求如下:
1.调试时的SQL与线上SQL一致
2.不能对线上数据产生影响
3.限制消耗资源
4.方便快捷地获取到预览结果
5.限制运行时长
为了满足上述要求,我们开发了基于redis的调试插件,用该插件替换原sink目标,具体步骤如下:
1.sql翻译时替换sink为专用的flink-connector-redis
2.flink-connector-redis为自研connector,提供限流功能,利用反压机制达到总体限流,并限制执行时间
3.调试使用默认有限资源
4. 调试结果从测试专用的redis中获得
本示例基于flink-connector-redis展示如何使用限流、定时关闭等功能。
1.创建一个限流表
create table limited_sink_table (user_name varchar, passport varchar)
with ( 'connector' = 'redis',
'host' = '10.11.69.176',
'port' = '6379',
'redis-mode' = 'single',
'password' = '***',
'command' = 'LPUSH', --将结果放入到list中
'sink.limit' = 'true', -- 开启限流
'sink.limit.interval'='100', -- 每线程写入间隔
'sink.limit.max-online' = '1500000', -- 每线程最大在线调试时间150秒
'sink.limit.max-num' = '10000'); -- 每线程最大写入量
2.替换原SQL的sink
insert into kafka_table select user_name, passport from source_table;
变为:
insert into limited_sink_table select user_name, passport from source_table;
3. 前端页面展示调试结果
通过rpop读取redis内测试数据。
项目地址:
插件地址:https://github.com/jeff-zou/flink-connector-redis.git
无法翻墙:https://gitee.com/jeff-zou/flink-connector-redis.git
使用方法:
在命令行执行 mvn package -DskipTests打包后,将生成的包flink-connector-redis-1.1.0.jar引入flink lib中即可,无需其它设置。
项目依赖jedis 3.7.1,如flink环境无jedis,则使用flink-connector-redis-1.1.0-jar-with-dependencies.jar。
开发环境工程直接引用:
<dependency>
<groupId>io.github.jeff-zou</groupId>
<artifactId>flink-connector-redis</artifactId>
<version>1.1.0</version>
</dependency>
网友评论