美文网首页
FLINK基于SQL向redis更新多维度状态(多字段)

FLINK基于SQL向redis更新多维度状态(多字段)

作者: 山里小龙 | 来源:发表于2020-11-03 17:25 被阅读0次

        在实时计算平台接入业务过程中,经常遇到这样一个问题: 第一个任务获得的最新状态需要提供给后续一个或者多个任务使用。解决这个问题有很多方法,将状态序列化到消息队列中或者持久化到数据库中,但这些方法要么实时性不足要么使用上有oom问题,当这些最新状态数量达到亿级或者更大时,有比较合适一点的方法来解决这个问题吗?

        我的解决方案是将状态存入redis-cluster中,这样在容量上及速度上满足了业务的需求,当然容量大小还是有限制的,不过现有大部分业务需求可以完全满足,当然这个方案还是有几个关键问题的,具体问题的解决方法如下:

    1.使用方便性问题:最新版的FLINK没有提供redis-connector,只能通过udf来连接redis,在纯SQL使用环境中多少有一些不方便与另类。为解决这个问题我在开源项目bahir-flink(https://github.com/apache/bahir-flink.git)的基础上进行了二次开发,使它支持SQL写入redis而无需定义UDF,用户通过SQL可以自由指定自己需要保存的信息。

    二次开发后的项目地址:flink-connector-redis(https://github.com/jeff-zou/flink-connector-redis.git),项目使用方法: mvn package -DskipTests=true打包后,将生成的包flink-connector-redis_2.12-1.11.1.jar引入flink引擎中无需设置即可直接使用,使用了服务扫描所以无需设置。

    2.多维度同时保存问题: redis是key-value存储结构,保存多维度并不方便,所以我利用不可见字符(如:\01)将多个维度值拼接成一个key或者value,这样无需创建多个key来保存不同维度,取值时也方便。

    3.单KEY数据量过大问题:我参考了数据库分区概念,增加了表属性:partition-column, 该属性会将指定的字段内容追加到key中从而形成一个新的key,再利用redis-cluster分区功能保存到不同节点中。

    create table redis_table (appid varchar, accountid varchar, channel varchar, levelvarchar, PRIMARY KEY (appid, accountid) not enforced) with ( 'connector'='redis', 'cluster-nodes'='redis1:6379,redis2:6379,redis3:6379', 'redis-mode'='cluster', 'additional-key'='new_user', 'password'='*****','command'='HSET', 'maxIdle'='10', 'minIdle'='1','partition-column'='appid' );

    insert into redis_table  SELECT t.appid, t.accountid, t.channel, t.server from source_table t where t.is_new_account = 1;

    SQL示例解析: 

    additional-key 指定redis key

    partition-column 指定了分区字段为appid,appid的值将会被追加到additional-key值后(_为分割符)

    PRIMARY KEY 可以指定一个或者多个字段(command需为hset), 不可见字符拼接后的值会被保存成hashmap的hashfield值。定义redis表也可以无主键,command 对应改为set即可

    假设测试数据如下:

    {"accountid":"jeff","appid":"91000285","channel":"git","level ":"10"}

    在redis中保存结果如下:

    key: new_user_91000285  hashfield:91000285\x01jeff  value(按顺序拼接非primary key字段值): git\x0110

    欢迎钉我探讨FLINK实践问题,钉钉号:jeff07

    相关文章

      网友评论

          本文标题:FLINK基于SQL向redis更新多维度状态(多字段)

          本文链接:https://www.haomeiwen.com/subject/npabvktx.html