美文网首页
flink 学习笔记 — 状态 State

flink 学习笔记 — 状态 State

作者: 飞不高的老鸟 | 来源:发表于2019-12-18 17:41 被阅读0次

    回顾

        在之前的学习中我们了解到,flink 作为低延时的流式数据处理框架,本身是有状态的。状态 state 是为了保存一些操作符 operator 的中间结果,同时,通过状态可以保证精确一致语义。

    State 分类

        State 从其实现方式可分为:Keyed State 和 Operator State,从管理方式可分为:Raw State 和 Managed State。

    Keyed State
    • Keyed State 通常是与 keys 相关的,其函数和操作只能在 KeyedStream 中使用。事实上,keyed 与 hive 中的分区极其类似,每一个 key 只能属于某一个 keyed state。
    DataStream<Tuple2<String, Integer>> counts =
                text.flatMap(new Tokenizer())
                .keyBy(0)   // 使用 keyby 方法进行划分,不同的 task 之间不会出现相同的 key
                .sum(1);
    
    keyby.png
    • 如上,有3个并行度的 WordCount 任务,在 keyby 之后,相同的 key 会被划分到相同的 task 中进行处理。
    Operator State
    • non-keyed state,每一个 operator state 都仅与一个 operator 的实例绑定。
    • 常见的 operator state 是 source state,例如记录当前 source 的 offset
    Managed State

        Managed State 是由 Flink Runtime 中管理的 State ,并将状态数据转换为 hashtable 或者 RocksDB 的对象进行存储。

    mysu_bj.png
    • ValueState:与 key 对应单个的值,在我们统计流式数据中的单词个数时,事实上,状态就是以 ValueState 存在,每次在状态值上进行更新。在其内部,调用 update(T value) 方法进行状态值的更新。
    public interface ValueState<T> extends State {
    
        /**
         * Returns the current value for the state. When the state is not
         * partitioned the returned value is the same for all inputs in a given
         * operator instance. If state partitioning is applied, the value returned
         * depends on the current operator input, as the operator maintains an
         * independent state for each partition.
         *
         * <p>If you didn't specify a default value when creating the {@link ValueStateDescriptor}
         * this will return {@code null} when to value was previously set using {@link #update(Object)}.
         *
         * @return The state value corresponding to the current input.
         *
         * @throws IOException Thrown if the system cannot access the state.
         */
        T value() throws IOException;
    
        /**
         * Updates the operator state accessible by {@link #value()} to the given
         * value. The next time {@link #value()} is called (for the same state
         * partition) the returned state will represent the updated value. When a
         * partitioned state is updated with null, the state for the current key
         * will be removed and the default value is returned on the next access.
         *
         * @param value The new value for the state.
         *
         * @throws IOException Thrown if the system cannot access the state.
         */
        void update(T value) throws IOException;
    
    }
    
    • ListState:与 key 对应的元素的列表的状态 list,内部定义 update(T)和 addAll(T)两个方法
    public interface ListState<T> extends MergingState<T, Iterable<T>> {
    
        /**
         * Updates the operator state accessible by {@link #get()} by updating existing values to
         * to the given list of values. The next time {@link #get()} is called (for the same state
         * partition) the returned state will represent the updated list.
         *
         * <p>If null or an empty list is passed in, the state value will be null.
         *
         * @param values The new values for the state.
         *
         * @throws Exception The method may forward exception thrown internally (by I/O or functions).
         */
        void update(List<T> values) throws Exception;
    
        /**
         * Updates the operator state accessible by {@link #get()} by adding the given values
         * to existing list of values. The next time {@link #get()} is called (for the same state
         * partition) the returned state will represent the updated list.
         *
         * <p>If null or an empty list is passed in, the state value remains unchanged.
         *
         * @param values The new values to be added to the state.
         *
         * @throws Exception The method may forward exception thrown internally (by I/O or functions).
         */
        void addAll(List<T> values) throws Exception;
    }
    
    • ReducingState:定义与 key 相关的数据元素单个聚合的状态值。
    • AggregatingState:定义与 key 相关的数据元素单个聚合的状态值。
    Raw State

        Raw State 是由算子本身进行管理的 State ,此时状态都是以字节数组的形式保存到 Checkpoint 中,Flink 并不清楚状态数据的内部结构,每次状态的写入和读取都需要算子进行序列化和反序列化。

    状态管理

        Flink 中状态管理有三种方案:MemoryStateBackend、FSStateBackend、RocksDBStateBackend。

    MemoryStateBackend
    • MemoryStateBackend 基于内存的状态管理器,它通常将状态数据存储在 JVM 内存中,包括 Key/State 及窗口中缓存的数据。但是由于内存本身的限制,基于内存的状态管理会造成内存溢出。因此,这种状态管理机制通常在本地测试中使用,生产中禁止使用内存状态管理器。
     env.setStateBackend(new MemoryStateBackend());
    
    FSStateBackend
    • FSStateBackend 基于文件的状态管理,和内存管理机制不同,FSStateBackend 通常把状态数据保存在本地文件系统,或者HDFS文件系统中。在初始化时,需要传入文件路径。基于文件的状态管理机制,适用于状态数据很大的数据,此时,如果使用内存状态管理器,很容易就把内存撑爆。通常情况下,为了保证文件状态安全性,会把文件状态保存在 HDFS 中,此时,借助 HDFS 的多副本的策略,保证文件状态不丢失。
    env.setStateBackend(new FsStateBackend(""));
    
    // 源码
    public FsStateBackend(Path checkpointDataUri) {
            this(checkpointDataUri.toUri());
        }
    
    
    RocksDBStateBackend
    • RocksDBStateBackend 基于内存和文件系统的状态管理器,这是基于三方的状态管理器。通常,先把状态放在内存中,等到到达一定的大小时,会将状态数据刷到文件中。
    env.setStateBackend(new RocksDBStateBackend(""));
    

    总结

        状态是 Flink 容错机制的基石,了解 State 的机制,可以更好的管理 Checkpoint,更好的进行失败任务的恢复。

    相关文章

      网友评论

          本文标题:flink 学习笔记 — 状态 State

          本文链接:https://www.haomeiwen.com/subject/jliunctx.html