美文网首页Flink源码解析
Flink key state 为何仅与 key 有关的

Flink key state 为何仅与 key 有关的

作者: shengjk1 | 来源:发表于2020-07-03 14:14 被阅读0次

    1. 依赖

    前面两篇我已经讲过 Flink getRuntimeContext().getMapState的时候发生了什么?以及 Flink StateDescriptor Name的作用
    今天我们在这个的基础上一起来看一下,为什么 key state 仅仅与 key 有关,无论我取数据还是修改数据,仅仅只能取到(修改)这个key 对应的那一部分。

    2. 以 RocksDBListState 为例

    2.1 add、get 方法讲解

    我们以 RocksDBListState 为例。分别查看 get 方法 和 add 方法
    get 方法 通过源码我们可以追踪到

        byte[] key = serializeCurrentKeyWithGroupAndNamespace();
                byte[] valueBytes = backend.db.get(columnFamily, key);
                return deserializeList(valueBytes);
    

    同理 add 方法

    backend.db.merge(
                    columnFamily,
                    writeOptions,
                    serializeCurrentKeyWithGroupAndNamespace(),
                    serializeValue(value, elementSerializer)
                );
    

    columnFamily 就不用说了,具体可以参考 Flink StateDescriptor Name的作用,主要就是 ColumnFamily Handle
    writeOptions rockdb 的写控制,比如说是 sync 还是 async等
    serializeValue 就是把 value 序列化成 byte 数组。

    2.2 关键性方法讲解

    关键性的方法来了 serializeCurrentKeyWithGroupAndNamespace,
    就是序列化 key key-group namespace( 当时window 的时候 就是window( 如:TimeWindow{start=1590502000000, end=1590503000000} ) 否则就是 VoidNamespace),
    它的作用就是 ColumnFamily 下的 key

    跟随 serializeCurrentKeyWithGroupAndNamespace 方法

    // the bytes for the serialized composite key of key-group, key, namespace
        // key-group key namespace 序列化为 rockdb 在指定 column family 下的 key,value 就是 value
        byte[] serializeCurrentKeyWithGroupAndNamespace() {
            return sharedKeyNamespaceSerializer.buildCompositeKeyNamespace(currentNamespace, namespaceSerializer);
        }
    

    3. 结论

    像 add clear update 等方法都会用到 serializeCurrentKeyWithGroupAndNamespace 这也就是为什么,key state 只会有 key 有关,因为去取值或者修改的时候需要依赖于 key。

    相关文章

      网友评论

        本文标题:Flink key state 为何仅与 key 有关的

        本文链接:https://www.haomeiwen.com/subject/yzjsqktx.html