美文网首页flink程序员
Flink详解之七--状态管理

Flink详解之七--状态管理

作者: 王吉吉real | 来源:发表于2021-01-31 00:05 被阅读0次

    一、概念

    实时处理中的很多操作都是一次对单个事件做处理,也有一些操作需要历史事件的信息,这些操作被称为有状态的。也就是说,所谓的状态就是由历史事件处理供后续操作使用的信息。

    二、用途

    状态是的使用场景有很多,主要有以下几种:

    • 窗口计算 比如天/小时/分钟窗口计算,需要保存窗口内计算所需的每个事件的信息或累积信息。
    • 机器学习 可以使用状态保存当天版本模型参数信息。
    • 历史数据对比 需要记录历史上某一段时间的数据。
    • 数据恢复 主要用于Flink的容错机制。

    三、分类

    3.1 keyed state vs operator state

    根据是否处理keyed stream,可以将状态分为两类keyed state和operator state。

    3.1.1 keyed state

    这类状态仅可在 KeyedStream 上使用,可以通过 stream.keyBy(...) 得到 KeyedStream。每个key对应一个state, 一个算子实例会处理多个key, 可以访问多个state。算子实例并发改变时,state会随着key在实例间迁移。所有keyed state都可以设置有效期,所有状态都支持单元素有效期设置,过期的状态会被清除。

    keyed state是最常用的一类状态,主要类型包括

    • ValueState<T> 保存一个可以更新和检索的值。
    • ListState<T> 保存一个元素的列表,可追加,可检索。
    • MapState<UK, UV> 维护了一个映射列表。
    • ReducingState<T> 所有状态聚合为一个单值,输入和输出数据类型一致。
    • AggregatingState<IN, OUT> 所有状态聚合为一个单值,输入和输出数据类型可以不一致。

    以ValueState<T>为例:

    public class TestState extends RichFlatMapFunction<Integer, Integer> {
    
        private transient ValueState<Integer> sum;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Integer> state = new ValueStateDescriptor<>("state", Integer.class);
            sum = getRuntimeContext().getState(state);
        }
    
        @Override
        public void flatMap(Integer input, Collector<Integer> out) throws Exception {
            sum.update(sum.value()+input);
        }
    }
    

    3.1.2 operater state

    Operator State也叫non-keyed state,每个算子状态都绑定一个并行算子实例。Kafka连接器是在Flink中使用算子状态的一个很好的例子。Kafka consumer的每个并行实例都维护主题分区和偏移量的映射作为其算子状态。

    当并发发生变化时,Operator State接口支持在并行算子实例之间重新分配状态。进行这种再分配有多种方式可选。

    3.2 managed state vs raw state

    根据状态是否有Flink运行时托管,将状态分为managed state和raw state。

    Managed State由Flink运行时控制的数据结构表示,如内部哈希表或RocksDB。例如ValueState、ListState等,Flink的运行时对状态进行编码并将它们写入checkpoints。

    Raw State是算子保存在自定义数据结构中的状态。checkpoint时,只向检查点写入一个字节序列。Flink不关注数据结构,只看原始字节。

    所有数据流函数都可以使用managed state, 但是raw state接口只能在实现operators后才能使用。开发中除非必要,否则推荐使用managed state。因为对于managed state, 当并行度改变时,Flink会自动重新分配状态,也可以更好的进行内存管理。

    四、存储方式

    flink内部状态的存储,取决于state backend的选择,目前有3种内置的state backend可供使用:MemoryStateBackend, FsStateBackend, RocksDBStateBackend。我们可以在配置文件 flink-conf.yaml 中配置所有Flink作业的默认StateBackend,也可以在作业中设置StateBackend,这样会覆盖默认值。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(...);
    
    4.1 MemoryStateBackend

    当用户没有设置时,默认使用MemoryStateBackend。运行时将状态保存在TaskManager堆内存中,CheckPoint时,状态快照会发往JobManager, 并存储在JobManager的堆内存中。为防止数据流阻塞,这个状态快照尽量配置成异步快照,默认也是异步的,除非构建时手动将异步参数置成false。

    new MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
    

    MemoryStateBackend的一些限制:

    • 独立状态默认最大为5M,上限(MAX_MEM_STATE_SIZE)可调。
    • MAX_MEM_STATE_SIZE最大不能超过akka frame大小
    • 所有状态聚合后,要放在JobManager的内存里,所以最大不能超过JobManager的内存。

    由于存储空间的限制,以及当作业重启时状态会丢失,所以,MemoryStateBackend更适合本地开发与调试,或者状态较小并且作业重启时对状态丢失不敏感的场景,不太适合普遍意义上的生产场景中使用。

    4.2 FsStateBackend

    FsStateBackend将运行时状态保存在TaskManager内存中,CheckPoint时,会将状态快照保存在指定的文件系统目录中,只会将少量元数据保存在JobManager,而高可用模式下,会将元数据保存在CheckPoint元数据文件中。状态快照默认也是异步的,除非构建时手动将异步参数置成false。

    new FsStateBackend(path, false); //path为文件系统路径
    

    状态大小只受限于TaskManager的内存,适用于窗口比较大,状态比较大的场景,支持高可用,适合在生产环境中使用。

    4.3 RocksDBStateBackend

    RocksDBStateBackend将运行时状态保存在RocksDB数据库中,RocksDB默认将文件存储在TaskManager文件目录中,checkpoint 时,整个 RocksDB 数据库被 checkpoint 到配置的文件系统目录中,支持增量checkpoint。只支持异步快照。

    RocksDBStateBackend的一些限制:

    • RocksDB支持Key和Value最大分别为2G
    • 运行时状态存入RocksDB,读写都需要序列化和反序列化,比写入内存效率低,状态较大时,也影响吞吐量。

    由于保存的状态只受磁盘大小限制,可以支持比FsStateBackend更大的状态场景。

    相关文章

      网友评论

        本文标题:Flink详解之七--状态管理

        本文链接:https://www.haomeiwen.com/subject/ikwzzktx.html