美文网首页
关于使用Flink RocksDB状态后端时一定要写MapSta

关于使用Flink RocksDB状态后端时一定要写MapSta

作者: LittleMagic | 来源:发表于2023-10-29 18:45 被阅读0次

    前言

    抱歉起这种烂大街的日本轻小说风格标题来吸引注意力。原本我认为这是常识,不需要专门写一篇文章来讲解如此细碎的点。但是在最近工作巡检中发现了越来越多如同ValueState<Map>的状态用法(当然大部分是历史遗留),部分Flink作业深受性能问题困扰,所以还是抽出点时间快速聊一聊,顺便给出不算优雅但还算有效的挽救方案。

    基于RocksDB的状态序列化

    我们已经知道,RocksDB是基于二进制流的内嵌K-V存储,所以Flink任务使用RocksDB状态后端时,写/读操作的状态数据都需要经过序列化和反序列化,从而利用TaskManager本地磁盘实现海量的状态存储。

    举个栗子,RocksDBValueState的取值和更新方法如下:

    class RocksDBValueState<K, N, V> extends AbstractRocksDBState<K, N, V>
           implements InternalValueState<K, N, V> {
       @Override
       public TypeSerializer<K> getKeySerializer() {
           return backend.getKeySerializer();
       }
    
       @Override
       public TypeSerializer<N> getNamespaceSerializer() {
           return namespaceSerializer;
       }
    
       @Override
       public TypeSerializer<V> getValueSerializer() {
           return valueSerializer;
       }
    
       @Override
       public V value() {
           try {
               byte[] valueBytes =
                       backend.db.get(columnFamily, serializeCurrentKeyWithGroupAndNamespace());
    
               if (valueBytes == null) {
                   return getDefaultValue();
               }
               dataInputView.setBuffer(valueBytes);
               return valueSerializer.deserialize(dataInputView);
           } catch (IOException | RocksDBException e) {
               throw new FlinkRuntimeException("Error while retrieving data from RocksDB.", e);
           }
       }
    
       @Override
       public void update(V value) {
           if (value == null) {
               clear();
               return;
           }
    
           try {
               backend.db.put(
                       columnFamily,
                       writeOptions,
                       serializeCurrentKeyWithGroupAndNamespace(),
                       serializeValue(value));
           } catch (Exception e) {
               throw new FlinkRuntimeException("Error while adding data to RocksDB", e);
           }
       }
    }
    

    可见key和value都需要经过对应类型的TypeSerializer的处理,即如果将状态声明为ValueState<Map<K, V>>,那么将由MapSerializer<K, V>负责值的正反序列化。特别注意,serializeCurrentKeyWithGroupAndNamespace()方法中,key需要加上它所对应的KeyGroup编号和对应的Namespace(Namespace是窗口信息),形成一个复合key,即:CompositeKey(KG, K, NS),RocksDB实际存储的状态数据的key都类似如此。具体可参看SerializedCompositeKeyBuilder类,不再赘述。

    接下来再看一下RocksDBMapState的部分实现。

    class RocksDBMapState<K, N, UK, UV> extends AbstractRocksDBState<K, N, Map<UK, UV>>
            implements InternalMapState<K, N, UK, UV> {
        @Override
        public TypeSerializer<K> getKeySerializer() {
            return backend.getKeySerializer();
        }
    
        @Override
        public TypeSerializer<N> getNamespaceSerializer() {
            return namespaceSerializer;
        }
    
        @Override
        public TypeSerializer<Map<UK, UV>> getValueSerializer() {
            return valueSerializer;
        }
    
        @Override
        public UV get(UK userKey) throws IOException, RocksDBException {
            byte[] rawKeyBytes =
                    serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
            byte[] rawValueBytes = backend.db.get(columnFamily, rawKeyBytes);
    
            return (rawValueBytes == null
                    ? null
                    : deserializeUserValue(dataInputView, rawValueBytes, userValueSerializer));
        }
    
        @Override
        public void put(UK userKey, UV userValue) throws IOException, RocksDBException {
    
            byte[] rawKeyBytes =
                    serializeCurrentKeyWithGroupAndNamespacePlusUserKey(userKey, userKeySerializer);
            byte[] rawValueBytes = serializeValueNullSensitive(userValue, userValueSerializer);
    
            backend.db.put(columnFamily, writeOptions, rawKeyBytes, rawValueBytes);
        }
    
        @Override
        public void putAll(Map<UK, UV> map) throws IOException, RocksDBException {
            if (map == null) {
                return;
            }
    
            try (RocksDBWriteBatchWrapper writeBatchWrapper =
                    new RocksDBWriteBatchWrapper(
                            backend.db, writeOptions, backend.getWriteBatchSize())) {
                for (Map.Entry<UK, UV> entry : map.entrySet()) {
                    byte[] rawKeyBytes =
                            serializeCurrentKeyWithGroupAndNamespacePlusUserKey(
                                    entry.getKey(), userKeySerializer);
                    byte[] rawValueBytes =
                            serializeValueNullSensitive(entry.getValue(), userValueSerializer);
                    writeBatchWrapper.put(columnFamily, rawKeyBytes, rawValueBytes);
                }
            }
        }
    
        @Override
        public Iterable<Map.Entry<UK, UV>> entries() {
            return this::iterator;
        }
    
        @Override
        public Iterator<Map.Entry<UK, UV>> iterator() {
            final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();
    
            return new RocksDBMapIterator<Map.Entry<UK, UV>>(
                    backend.db, prefixBytes, userKeySerializer, userValueSerializer, dataInputView) {
                @Override
                public Map.Entry<UK, UV> next() {
                    return nextEntry();
                }
            };
        }
    

    由于MapState的本身有用户定义的key UK,所以RocksDB存储它时,会在上文所述的复合key后面,再加上UK的值,即:CompositeKey(KG, K, NS) :: UK。这样,同属于一个KeyContext的所有用户键值对就存在一个连续的存储空间内,可以通过RocksDB WriteBatch机制攒批,实现批量写(putAll()方法),也可以通过RocksDB Iterator机制做前缀扫描,实现批量读(entries()方法)。

    问题的症结

    代码读完了。假设我们在某个key下有5条数据的状态,若使用ValueState<Map<String, String>>来存储,按照MapSerializer的序列化方式,其存储可以记为:

    (1, k, VoidNamespace) -> [5, k1, false, v1, k2, false, v2, k3, true, k4, false, v4, k5, false, v5]
    

    注意对于无窗口上下文的状态,NS为VoidNamespace。且序列化Map时,会加上Map的大小,以及表示每个value是否为NULL的标记。

    如果使用MapState<String, String>存储,可以记为:

    (1, k, VoidNamespace) :: k1 -> v1
    (1, k, VoidNamespace) :: k2 -> v2
    (1, k, VoidNamespace) :: k3 -> NULL
    (1, k, VoidNamespace) :: k4 -> v4
    (1, k, VoidNamespace) :: k5 -> v5
    

    如果我们获取或修改一条状态数据,前者需要将所有数据做一遍序列化和反序列化,而后者只需要处理一条。在Map比较小的情况下可能没有明显的性能差异,但是如果Map有几十个甚至上百个键值对,或者某些value的长度很长(如各类打标标记串等),ValueState<Map>的性能退化就会非常严重,造成反压。

    有的同学可能会问:我对状态数据的操作基本都是“整存整取”(即读/写整个Map),也不建议使用ValueState<Map>吗?答案仍然是不建议。除了前面提到的WriteBatch和Iterator为MapState带来的优化之外,RocksDB更可以利用多线程进行读写,而单个大value不仅不能享受这个便利,还会挤占Block Cache空间,在出现数据倾斜等场景时,磁盘I/O可能会打到瓶颈。所以,我们在开始编写作业时就应该正确使用MapState

    平滑迁移

    为了消除此类状态误用的影响,常见的重构方式是将ValueState<Map>修改为MapState,重置位点后消费历史数据,积攒状态,并替换掉旧任务。但是对于状态TTL较长、size较大的场景(例如物流监控场景经常有30天TTL、十几TB大的State),这样显然非常不方便,下面提供一种简单的平滑迁移方式。

    假设原本误用的状态为mainState,我们声明两个新的状态,一个是新的MapState newMainState,一个是布尔型ValueState isMigratedState,表示该key对应的状态是否已经迁移成了新的,即:

        private transient ValueState<Map<String, String>> mainState;
    
        private transient ValueState<Boolean> isMigratedState;
        private transient MapState<String, String> newMainState;    
    

    当然,它们的TTL等参数要完全相同。

    写两个新的方法,负责在读写mainState时将其迁移成newMainState,并做上相应的标记。不存在历史状态的,直接以新格式存储。再强调一遍,newMainState.entries()newMainState.putAll()的性能很不错,不必过于担心。

        private Map<String, String> wrapGetMainState() throws Exception {
            Boolean isMigrated = isMigratedState.value();
    
            if (isMigrated == null || !isMigrated) {
                Map<String, String> oldStateData = mainState.value();
                if (oldStateData != null) {
                    newMainState.putAll(mainState.value());
                }
                isMigratedState.update(true);
                mainState.clear();
            }
    
            Map<String, String> result = new HashMap<>();
            for (Entry<String, String> e : newMainState.entries()) {
                result.put(e.getKey(), e.getValue());
            }
            return result;
        }
    
        private void wrapUpdateMainState(Map<String, String> data) throws Exception {
            Boolean isMigrated = isMigratedState.value();
    
            if (isMigrated == null || !isMigrated) {
                Map<String, String> oldStateData = mainState.value();
                if (oldStateData != null) {
                    newMainState.putAll(mainState.value());
                }
                isMigratedState.update(true);
                mainState.clear();
            }
    
            newMainState.putAll(data);
        }
    

    再将历史代码中的状态访问全部替换成wrapGetMainState()wrapUpdateMainState()方法的调用即可。表面上看是由一个状态句柄变成了两个状态句柄,但是标记状态的访问十分轻量级,且随着程序的运行,旧状态的数据渐进式地替换完毕之后,就可以安全地删除mainStateisMigratedState了。当然,托管内存的设置要科学,并添加一些有利于RocksDB状态吞吐量的参数,如:

    state.backend.rocksdb.predefined-options  SPINNING_DISK_OPTIMIZED_HIGH_MEM
    state.backend.rocksdb.memory.partitioned-index-filters  true
    

    基于堆的状态呢?

    与RocksDB相反,基于堆的JobManager和FileSystem状态后端无需序列化和反序列化,当然状态的大小就要受制于TaskManager内存。不过,如果我们采用这两种状态后端,ValueState<Map>MapState也就没有明显的性能差别了,因为HeapValueStateHeapMapState的底层都是相同的,即CopyOnWriteStateTable,本质上是内存中的状态映射表。读者有兴趣可以自行参考对应的Flink源码,这里不再啰嗦了。

    ValueState、ListState、MapState三者在RocksDB状态后端和基于堆的状态后端中的异同点可以概括成下表。

    The End

    相关文章

      网友评论

          本文标题:关于使用Flink RocksDB状态后端时一定要写MapSta

          本文链接:https://www.haomeiwen.com/subject/ofcpidtx.html