美文网首页
Flink SQL 知其所以然(三)| 自定义 redis 数据

Flink SQL 知其所以然(三)| 自定义 redis 数据

作者: 大数据羊说 | 来源:发表于2022-02-14 20:37 被阅读0次

    感谢您的关注 + 点赞 + 再看,对博主的肯定,会督促博主持续的输出更多的优质实战内容!!!

    1.序篇-本文结构

    1. 背景篇-为啥需要 redis 数据汇表
    2. 目标篇-redis 数据汇表预期效果
    3. 难点剖析篇-此框架建设的难点、目前有哪些实现
    4. 维表实现篇-实现的过程
    5. 总结与展望篇

    本文主要介绍了 flink sql redis 数据汇表的实现过程。

    如果想在本地测试下:

    1. 在后台回复
      • flink sql 知其所以然(三)| sql 自定义 redis 数据汇表获取源码(源码基于 1.13.1 实现)
      • flink sql 知其所以然(三)| sql 自定义 redis 数据汇表获取源码(源码基于 1.13.1 实现)
      • flink sql 知其所以然(三)| sql 自定义 redis 数据汇表获取源码(源码基于 1.13.1 实现)
    2. 在你的本地安装并启动 redis-server。
    3. 执行源码包中的 flink.examples.sql._03.source_sink.RedisSinkTest 测试类,然后使用 redis-cli 执行 get a 就可以看到结果了(目前只支持 kv,即 redis set key value)。

    如果想直接在集群环境使用:

    1. 命令行执行 mvn package -DskipTests=true 打包
    2. 将生成的包 flink-examples-0.0.1-SNAPSHOT.jar 引入 flink lib 中即可,无需其它设置。

    2.背景篇-为啥需要 redis 数据汇表

    目前在实时计算的场景中,熟悉 datastream 的同学在很多场景下都会将结果数据写入到 redis 提供数据服务。

    举个例子:

    1. 外存状态引擎:需要把历史所有的 id 存储下来,但是因为 id 会不断增多,仅仅使用 flink 内部状态引擎的话,状态会越来越大,很难去保障其稳定性。那么这时就会选择外部状态引擎,比如 redis。在我们使用 redis 存储所有设备 id 时,除了使用 redis 作为维表去访问 id 是否出现过,还需要将新增的 id 写入到 redis 中以供后续的去重。这时候就需要使用到 redis sink 表。
    2. 数据服务引擎:在某些大促(双十一)的场景下需要将 flink 计算好的结果直接写入到 redis 中以提供高速数据服务引擎,直接提供给大屏查询使用。

    而官方是没有提供 flink sql api 的 redis sink connector 的。如下图,基于 1.13 版本。

    https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/

    1.png

    阿里云 flink 是提供了这个能力的。

    https://www.alibabacloud.com/help/zh/faq-detail/118038.htm?spm=a2c63.q38357.a3.16.48fa711fo1gVUd

    2.png

    因此本文在介绍怎样自定义一个 sql 数据汇表的同时,实现一个 sql redis sink connector 来给大家使用。

    3.目标篇-redis 数据汇表预期效果

    redis 作为数据汇表在 datastream 中的最常用的数据结构有很多,基本上所有的数据结构都有可能使用到。
    本文实现主要实现 kv 结构,其他结构大家可以拿到源码之后进行自定义实现。也就多加几行代码就完事了。

    预期效果就如阿里云的 flink redis,redis set key value 的预期 flink sql:

    CREATE TABLE redis_sink_table (
        key STRING, -- redis key,第 1 列为 key
        `value` STRING -- redis value,第 2 列为 value
    ) WITH (
      'connector' = 'redis', -- 指定 connector 是 redis 类型的
      'hostname' = '127.0.0.1', -- redis server ip
      'port' = '6379', -- redis server 端口
      'write.mode' = 'string' -- 指定使用 redis `set key value`
    )
    
    INSERT INTO redis_sink_table
    SELECT o.f0 as key, o.f1 as value
    FROM leftTable AS o
    

    下面是我在本地跑的结果:

    3.png

    首先看下我们的测试输入,f0 恒定为 af1 恒定为 b,并且每 10ms 写入一次:

    4.png

    预期结果是 key 为 a,value 会为 b,实际结果也相同,使用 redis-cli 查询下,我删除掉也能在 10ms 后写入,所以查询时可以一直查得到:

    5.gif

    4.难点剖析篇-目前有哪些实现

    目前可以从网上搜到的实现、以及可以参考的实现有以下两个:

    1. https://github.com/jeff-zou/flink-connector-redis。 但使用起来有比较多的限制,包括需要在建表时就指定 key-column,value-column 等,其实博主觉得没必要指定这些字段,这些都可以动态调整。其实现是对 apache-bahir-flink https://github.com/apache/bahir-flink 的二次开发,但与 bahir 原生实现有割裂感,因为这个项目几乎参考 bahir redis connector 重新实现了一遍,接口与 bahir 不太相同。
    2. 阿里云实现 https://www.alibabacloud.com/help/zh/faq-detail/122722.htm?spm=a2c63.q38357.a3.7.a1227a53TBMuSY。 阿里云的实现相对比较动态化,不需要在建表时就指定 hmap 等数据结构的 map key。

    因此博主在实现时,定了一个基调。

    1. 参考阿里云的 DDL 实现
    2. 高度复用性:复用 bahir 提供的 redis connnector
    3. 简洁性:目前只实现 kv 结构,后续扩展可以给用户自己实现,扩展其实是非常简单的

    5.实现篇-实现的过程

    在实现 redis 数据汇表之前,不得不谈谈 flink 数据汇表加载和使用机制。

    5.1.flink 数据汇表原理

    其实上节已经详细描述了 flink sql 对于 source\sink 的加载机制。

    1. 通过 SPI 机制加载所有的 source\sink\format 工厂 Factory
    2. 过滤出 DynamicTableSinkFactory + connector 标识的 sink 工厂类
    3. 通过 sink 工厂类创建出对应的 sink
    7.png 8.png

    如图 source 和 sink 是通过 FactoryUtil.createTableSourceFactoryUtil.createTableSink 创建的

    16.png

    所有通过 SPI 的 source\sink\formt 插件都继承自 Factory

    整体创建 sink 方法的调用链如下图。

    10.png

    5.2.flink 数据汇表实现方案

    先看下博主的最终实现。

    由于高度复用了 bahir redis connector,所以需要重点实现就只有两个类:

    1. RedisDynamicTableFactory
    2. RedisDynamicTableSink
    6.png

    具体流程:

    1. 定义 SPI 的工厂类 RedisDynamicTableFactory implements DynamicTableSinkFactory,并且在 resource\META-INF 下创建 SPI 的插件文件
    2. 实现 factoryIdentifier 标识 redis
    3. 实现 RedisDynamicTableFactory#createDynamicTableSink 来创建对应的 source RedisDynamicTableSink
    4. 定义 RedisDynamicTableSink implements DynamicTableSink
    5. 实现 RedisDynamicTableFactory#getSinkRuntimeProvider 方法,创建具体的维表 UDF RichSinkFunction<T>,这里直接服用了 bahir redis 中的 RedisSink<IN>

    介绍完流程,进入具体实现方案细节:

    RedisDynamicTableFactory 主要创建 sink 的逻辑:

    public class RedisDynamicTableFactory implements DynamicTableSinkFactory {
        ...
    
        @Override
        public String factoryIdentifier() {
            // 标识 redis
            return "redis";
        }
    
        @Override
        public DynamicTableSink createDynamicTableSink(Context context) {
    
            // either implement your custom validation logic here ...
            // or use the provided helper utility
            final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
    
            // validate all options
            // 所有 option 配置的校验,比如 write.mode 类参数
            helper.validate();
    
            // get the validated options
            final ReadableConfig options = helper.getOptions();
    
            final RedisWriteOptions redisWriteOptions = RedisOptions.getRedisWriteOptions(options);
    
            TableSchema schema = context.getCatalogTable().getSchema();
    
            / 创建 RedisDynamicTableSink
            return new RedisDynamicTableSink(schema.toPhysicalRowDataType()
                    , redisWriteOptions);
        }
    }
    

    resources\META-INF 文件:

    11.png

    RedisDynamicTableSource 主要创建 table udf 的逻辑:

    public class RedisDynamicTableSink implements DynamicTableSink {
        ...
    
        @Override
        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
    
            // 初始化 redis 客户端配置
            FlinkJedisConfigBase flinkJedisConfigBase = new FlinkJedisPoolConfig.Builder()
                    .setHost(this.redisWriteOptions.getHostname())
                    .setPort(this.redisWriteOptions.getPort())
                    .build();
    
            RedisMapper<RowData> redisMapper = null;
    
            switch (this.redisWriteOptions.getWriteMode()) {
                case "string":
                    // redis key,value 序列化器
                    // 从 RowData 转换成 redis 的 key value
                    redisMapper = new SetRedisMapper();
                    break;
                default:
                    throw new RuntimeException("其他类型 write mode 请自定义实现");
            }
    
            // 创建 SinkFunction,注意!!!这里直接复用了 bahir 的实现
            return SinkFunctionProvider.of(new RedisSink<>(
                    flinkJedisConfigBase
                    , redisMapper));
        }
    }
    

    RedisSink 执行写入 redis 的主要流程,这里是 bahir 的实现:

    public class RedisRowDataLookupFunction extends TableFunction<RowData> {
        ...
    
        @Override
        public void invoke(IN input) throws Exception {
            String key = redisSinkMapper.getKeyFromData(input);
            String value = redisSinkMapper.getValueFromData(input);
    
            // 根据具体的命令执行具体写入 redis 的命令
            switch (redisCommand) {
                case RPUSH:
                    this.redisCommandsContainer.rpush(key, value);
                    break;
                case LPUSH:
                    this.redisCommandsContainer.lpush(key, value);
                    break;
                case SADD:
                    this.redisCommandsContainer.sadd(key, value);
                    break;
                case SET:
                    this.redisCommandsContainer.set(key, value);
                    break;
                case PFADD:
                    this.redisCommandsContainer.pfadd(key, value);
                    break;
                case PUBLISH:
                    this.redisCommandsContainer.publish(key, value);
                    break;
                case ZADD:
                    this.redisCommandsContainer.zadd(this.additionalKey, value, key);
                    break;
                case ZREM:
                    this.redisCommandsContainer.zrem(this.additionalKey, key);
                    break;
                case HSET:
                    this.redisCommandsContainer.hset(this.additionalKey, key, value);
                    break;
                default:
                    throw new IllegalArgumentException("Cannot process such data type: " + redisCommand);
            }
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            try {
                // 初始化 redis 执行器
                this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
                this.redisCommandsContainer.open();
            } catch (Exception e) {
                LOG.error("Redis has not been properly initialized: ", e);
                throw e;
            }
        }
    }
    

    5.2.1.复用 bahir connector

    如图是 bahir redis connector 的实现。

    15.png

    博主在实现过程中将能复用的都尽力复用。如图是最终实现目录。

    12.png

    可以看到实现非常简单。

    其中 redis 客户端及其配置redis 命令执行器redis 命令定义器 是直接复用了 bahir redis 的。
    如果你想要在生产环境中进行使用,可以直接将两部分代码合并,成本很低。

    源码后台回复flink sql 知其所以然(三)| sql 自定义 redis 数据汇表获取。

    6.总结与展望篇

    6.1.总结

    本文主要是针对 flink sql redis 数据汇表进行了扩展以及实现,并且复用 bahir redis connector 的配置,具有良好的扩展性。
    如果你正好需要这么一个 connector,后台回复flink sql 知其所以然(三)| sql 自定义 redis 数据汇表获取源码吧。

    6.2.展望

    当然上述只是 redis 数据汇表一个基础的实现,用于生产环境还有很多方面可以去扩展的。

    1. jedis cluster 的扩展:目前 bahir datastream 中已经实现了,可以直接参考,扩展起来非常简单
    2. 异常 AOP,alert 等

    相关文章

      网友评论

          本文标题:Flink SQL 知其所以然(三)| 自定义 redis 数据

          本文链接:https://www.haomeiwen.com/subject/yvvflrtx.html