美文网首页
译:Flink---状态

译:Flink---状态

作者: 雪味伦调 | 来源:发表于2019-02-13 01:01 被阅读0次

    Flink 1.7 Google翻译

    键控状态和操作状态


    Flink中有两种基本的状态类型:键控状态和运算符状态

    键控状态

    键控状态总是与键相关并且只能在键控流的函数和运算符中使用

    你可以将Keyed State视为已分区或分片的操作符状态,每个键只有一个状态分区。每个键控状态在逻辑上绑定到<parallel-operator-instance,key>的唯一复合,并且由于每个键“属于”键控运算符的一个并行实例,我们可以将其简单地视为<operator,key >

    键控状态进一步组织成所谓的键控组。键控组是Flink可以重新分配密钥状态的原子单元;键控组与定义的最大并行度完全一样多。在执行期间,键控运算符的每个并行实例都使用一个或多个键控组的键。

    运算符状态

    使用运算符状态(或非键控状态), 每个运算符状态绑定到一个运算符实例。Kafka连接器在Flink中是一个使用运算符状态的好例子。任一个Kafka消费者并行实例都包含一个主题与偏移映射作为运算符状态。

    运算符状态接口支持当并行度改变时通过并行操作符重新分配实例。进行重新分配有多种不同的方案

    原生及托管状态


    键控状态及运算符状态以两种方式存在:托管状态及原生状态

    托管状态由Flink运行时控制的数据结构表示,例如内部的hash表,或者RocksDB.例如“ValueState”,"ListState"等。Flink将state编码并写入checkpoints中

    原生状态是把运算符保存在自己数据结构中的状态。当状态检查时,它们仅将字节序列写入到checkpoint。Flink不感知状态的数据结构,它只能看到原生的字节数据

    所有的datastream函数都可以使用托管状态,但是原生状态仅能在实现运算符时使用。建议使用托管状态而非原生状态,因为在托管状态下,当并行度改变时Flink可以自动重新分配状态,也可以更好的做内存管理。

    注意:如果你的托管状态需要自定义序列化逻辑,请查看相关引导以保证未来的兼容性。Flink的默认序列化不需要特殊对待

    使用托管监控状态


    监控托管转台接口提供不同的状态类型的访问,这些状态都限定为当前输入元素的键。这意味着这些状态类型只能在监控流中使用,监控流使用stream.keyBy()来创建
    现在,我们首先来看一下可用的不同状态类型,之后我们可以看到如何在程序中使用它们。可用的原生状态:

    • ValueState<T>: 它保留了可以更新和检索的值(如上所述,作用于输入元素的键的范围,因此操作看到的每个键可能有一个值)使用update(T)更新值,使用T value()函数检索
    • ListState<T>: 这保留了元素列表。您可以追加元素并在所有当前存储的元素上检索Iterable。使用add(T)或addAll(List <T>)添加元素,可以使用Iterable <T> get()检索Iterable。您还可以使用update覆盖现有列表(List <T>)
    • ReducingState<T>: 这保留了一个值,表示添加到状态的所有值的聚合。该接口类似于ListState,但使用add(T)添加的元素使用指定的ReduceFunction缩减为聚合
    • AggregatingState<IN, OUT>: 这保留了一个值,表示添加到状态的所有值的聚合。与ReducingState相反,聚合类型可能与添加到状态的元素类型不同。接口与ListState相同,但使用add(IN)添加的元素使用指定的AggregateFunction进行聚合
    • FoldingState<T, ACC>: 这保留了一个值,表示添加到状态的所有值的聚合。与ReducingState相反,聚合类型可能与添加到状态的元素类型不同。该接口类似于ListState,但使用add(T)添加的元素使用指定的FoldFunction折叠为聚合
    • MapState<UK, UV>: 这保留了映射列表。您可以将键值对放入状态,并在所有当前存储的映射上检索Iterable。使用put(UK,UV)或putAll(Map <UK,UV>)添加映射。可以使用get(UK)检索与用户密钥关联的值。可以分别使用entries(),keys()和values()来检索映射,键和值的可迭代视图

    所有类型的状态还有一个方法clear(),它清除当前活动键的状态,即输入元素的键

    注意: FoldingState和FoldingStateDescriptor在Flink 1.4中废弃了并且将在未来完全删除。请使用AggregatingState和AggregatingStateDescriptor

    重要的是要记住,这些状态对象仅用于与状态接口。状态不一定存储在内部,但可能驻留在磁盘或其他位置。要记住的第二件事是,从状态获得的值取决于input元素的键。因此,如果涉及的键不同,则在一次调用用户函数时获得的值可能与另一次调用中的值不同

    要获取状态句柄,您必须创建StateDescriptor。这保存了状态的名称(正如我们稍后将看到的,您可以创建多个状态,并且它们必须具有唯一的名称以便您可以引用它们),状态所持有的值的类型,并且可能是用户 - 指定的函数,例如ReduceFunction。根据要检索的状态类型,创建ValueStateDescriptor,ListStateDescriptor,ReducingStateDescriptor,FoldingStateDescriptor或MapStateDescriptor

    使用RuntimeContext访问状态,因此只能在丰富的函数中使用。请参阅此处了解相关信息,但我们很快也会看到一个示例。 RichFunction中可用的RuntimeContext具有这些访问状态的方法:

    • ValueState<T> getState(ValueStateDescriptor<T>)
    • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
    • ListState<T> getListState(ListStateDescriptor<T>)
    • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
    • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
    • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

    这是一个示例FlatMapFunction,显示所有部件如何组合在一起:

    public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
    
        /**
         * The ValueState handle. The first field is the count, the second field a running sum.
         */
        private transient ValueState<Tuple2<Long, Long>> sum;
    
        @Override
        public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
    
            // access the state value
            Tuple2<Long, Long> currentSum = sum.value();
    
            // update the count
            currentSum.f0 += 1;
    
            // add the second field of the input value
            currentSum.f1 += input.f1;
    
            // update the state
            sum.update(currentSum);
    
            // if the count reaches 2, emit the average and clear the state
            if (currentSum.f0 >= 2) {
                out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
                sum.clear();
            }
        }
    
        @Override
        public void open(Configuration config) {
            ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
                    new ValueStateDescriptor<>(
                            "average", // the state name
                            TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
                            Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
            sum = getRuntimeContext().getState(descriptor);
        }
    }
    
    // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
    env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
            .keyBy(0)
            .flatMap(new CountWindowAverage())
            .print();
    
    // the printed output will be (1,4) and (1,5)
    

    这个例子实现了一个穷人的计数窗口。我们通过第一个字段键入元组(在示例中都具有相同的键1)。该函数将计数和运行总和存储在ValueState中。一旦计数达到2,它将发出平均值并清除状态,以便我们从0开始。注意,如果我们在第一个字段中具有不同值的元组,这将为每个不同的输入键保持不同的状态值

    TTL状态


    可以将生存时间(TTL)分配给任何类型的键控状态。如果配置了TTL并且状态值已过期,则将尽力清除存储的值,这将在下面更详细地讨论。

    所有状态集合类型都支持每个条目的TTL。这意味着列表元素和映射条目将独立过期。

    为了使用状态TTL,必须首先构建StateTtlConfig配置对象。然后,可以通过传递配置在任何状态描述符中启用TTL功能

    import org.apache.flink.api.common.state.StateTtlConfig;
    import org.apache.flink.api.common.state.ValueStateDescriptor;
    import org.apache.flink.api.common.time.Time;
    
    StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(1))
        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
        .build();
    ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
    stateDescriptor.enableTimeToLive(ttlConfig);
    

    它的配置项有一下几点需要考虑:

    第一个参数newBuilder是强制性的,指定value有效期

    更新类型配置为TTL刷新的时机(默认是OnCreateAndWrite)

    • StateTtlConfig.UpdateType.OnCreateAndWrite - only on creation and write access
    • StateTtlConfig.UpdateType.OnReadAndWrite - also on read access

    状态可见性配置如果过期值尚未清除,是否在读取访问时返回它(默认情况下NeverReturnExpired)

    • StateTtlConfig.StateVisibility.NeverReturnExpired - 从不返回
    • StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 没有被清除时返回

    在NeverReturnExpired的情况下,过期状态表现得好像它不再存在,即使它仍然必须被删除。
    该选项对于在TTL之后必须严格用于读取访问的数据的用例是有用的,例如,应用程序使用隐私敏感数据

    另一个选项ReturnExpiredIfNotCleanedUp允许返回过期状态当它没有被清除时

    注意:

    • 状态后端存储上次修改的时间戳以及用户值,这意味着启用此功能会增加状态存储的消耗。
      堆状态后端存储一个额外的Java对象,其中包含对用户状态对象的引用和内存中的原始long值。
      RocksDB状态后端为每个存储值,列表条目或映射条目添加8个字节。
    • 目前仅支持参考处理时间的TTL
    • 尝试恢复先前未配置TTL的状态,使用启用TTL的描述符(反之亦然)将导致兼容性失败和StateMigrationException
    • TTL配置不是检查点或保存点的一部分,而是Flink如何在当前运行的作业中处理它的方式。
    • 仅当用户值序列化程序可以处理空值时,具有TTL的映射状态当前才支持空用户值。如果序列化程序不支持空值,
      它可以用NullableSerializer包装,代价是序列化形式的额外字节

    清除过期状态

    当前,仅在过期值独立读取时才可以移除,比如调用ValueState.value()

    注意:这意味着默认情况下,如果未读取过期状态,则不会将其删除,从而可能导致状态不断增长。这可能在将来的版本中发生变化

    此外,您可以在获取完整状态快照时激活清理,这将减小其大小。在当前实现下不清除本地状态,但是在从先前快照恢复的情况下它
    不会包括已移除的过期状态。它可以在StateTtlConfig中配置

    import org.apache.flink.api.common.state.StateTtlConfig;
    import org.apache.flink.api.common.time.Time;
    
    StateTtlConfig ttlConfig = StateTtlConfig
        .newBuilder(Time.seconds(1))
        .cleanupFullSnapshot()
        .build();
    

    该操作不支持在RocksDB状态后端中增量的checkpointing,更多的在后台自动清理过期状态的策略将在未来加入

    使用托管运算符状态

    要使用托管操作符状态,有状态函数可以实现更通用的CheckpointedFunction接口,或者ListCheckpointed <T extends Serializable>接口

    CheckpointedFunction

    CheckpointedFunction 接口提供对具有不同重新分发方案的非键控状态的访问,它需要实现以下两个方法

    void snapshotState(FunctionSnapshotContext context) throws Exception;
    
    void initializeState(FunctionInitializationContext context) throws Exception;
    

    每当必须执行检查点时,都会调用snapshotState()。每次初始化用户定义的函数时,都会调用对应的initializeState(),即首次初始化函数时,或者当函数实际从早期检查点恢复时。鉴于此,initializeState()不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑的位置

    目前,支持列表样式的托管操作符状态。该状态应该是一个可序列化对象的列表,彼此独立,因此有资格在重新缩放时重新分配。换句话说,这些对象是可以重新分配非键控状态的最精细的粒度。根据状态访问方法,定义了以下重新分发方案

    • Even-split redistribution: 每个运算符都返回一个状态元素列表。整个状态在逻辑上是所有列表的串联。在恢复/重新分发时,列表被平均分成与并行运算符一样多的子列表。每个运算符都获得一个子列表,该子列表可以为空,或包含一个或多个元素。例如,如果使用并行性1,运算符的检查点状态包含元素element1和element2,当将并行性增加到2时,element1可能最终在运算符实例0中,而element2将转到运算符实例1
    • Union redistribution: 每个运算符都返回一个状态元素列表。整个状态在逻辑上是所有列表的串联。在恢复/重新分配时,每个运算符都会获得完整的状态元素列表

    下面是一个有状态SinkFunction的示例,它使用CheckpointedFunction缓冲元素,然后再将它们发送到外部世界。它演示了基本的 even-split再分配列表状态

    public class BufferingSink
            implements SinkFunction<Tuple2<String, Integer>>,
                       CheckpointedFunction {
    
        private final int threshold;
    
        private transient ListState<Tuple2<String, Integer>> checkpointedState;
    
        private List<Tuple2<String, Integer>> bufferedElements;
    
        public BufferingSink(int threshold) {
            this.threshold = threshold;
            this.bufferedElements = new ArrayList<>();
        }
    
        @Override
        public void invoke(Tuple2<String, Integer> value) throws Exception {
            bufferedElements.add(value);
            if (bufferedElements.size() == threshold) {
                for (Tuple2<String, Integer> element: bufferedElements) {
                    // send it to the sink
                }
                bufferedElements.clear();
            }
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            checkpointedState.clear();
            for (Tuple2<String, Integer> element : bufferedElements) {
                checkpointedState.add(element);
            }
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Tuple2<String, Integer>> descriptor =
                new ListStateDescriptor<>(
                    "buffered-elements",
                    TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
    
            checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    
            if (context.isRestored()) {
                for (Tuple2<String, Integer> element : checkpointedState.get()) {
                    bufferedElements.add(element);
                }
            }
        }
    }
    

    initializeState方法将FunctionInitializationContext作为参数。这用于初始化非键控状态“容器”。
    这些是ListState类型的容器,其中非键控状态对象将在检查点存储

    注意状态是如何初始化的,类似于键控状态,StateDescriptor包含状态名称和有关状态所持有的值类型的信息

    ListStateDescriptor<Tuple2<String, Integer>> descriptor =
        new ListStateDescriptor<>(
            "buffered-elements",
            TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
    
    checkpointedState = context.getOperatorStateStore().getListState(descriptor);
    

    状态访问方法的命名约定包含其重新分发模式,后跟其状态结构.例如,要在还原时使用联合重新分发方案的列表状态,请使用getUnionListState(descriptor)访问该状态。如果方法名称不包含重新分发模式,例如getListState(描述符),它只是意味着将使用基本的even-split再分配方案

    在初始化容器之后,我们使用上下文的isRestored()方法来检查我们是否在失败后恢复。如果这是真的,即我们正在恢复,则应用恢复逻辑

    如修改后的BufferingSink的代码所示,在状态初始化期间恢复的ListState保存在类变量中以供将来在snapshotState()中使用。在那里,ListState被清除前一个检查点包含的所有对象,然后填充我们想要检查点的新对象。

    作为旁注,键控状态也可以在initializeState()方法中初始化。这可以使用FunctionInitializationContext提供的方式完成

    ListCheckpointed

    ListCheckpointed接口是CheckpointedFunction的一个更有限的变体,它仅支持在恢复时具有even-split再分配方案的列表样式状态。
    它还需要实现两种方法

    List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
    
    void restoreState(List<T> state) throws Exception;
    

    在snapshotState()上,运算符应该返回检查点的对象列表,restoreState必须在恢复时处理这样的列表。如果状态不可重新分区,
    则始终可以在snapshotState()中返回Collections.singletonList(MY_STATE)

    有状态源函数


    与其他运算符相比,有状态的源需要更多的关注。为了使状态和输出集合的更新成为原子(在故障/恢复时exactly-once所需),
    用户需要从源的上下文获取锁定。

    public static class CounterSource
            extends RichParallelSourceFunction<Long>
            implements ListCheckpointed<Long> {
    
        /**  current offset for exactly once semantics */
        private Long offset;
    
        /** flag for job cancellation */
        private volatile boolean isRunning = true;
    
        @Override
        public void run(SourceContext<Long> ctx) {
            final Object lock = ctx.getCheckpointLock();
    
            while (isRunning) {
                // output and state update are atomic
                synchronized (lock) {
                    ctx.collect(offset);
                    offset += 1;
                }
            }
        }
    
        @Override
        public void cancel() {
            isRunning = false;
        }
    
        @Override
        public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
            return Collections.singletonList(offset);
        }
    
        @Override
        public void restoreState(List<Long> state) {
            for (Long s : state)
                offset = s;
        }
    }
    

    当Flink完全确认检查点与外界通信时,某些运算符可能需要这些信息,在这种情况下,
    请参阅org.apache.flink.runtime.state.CheckpointListener接口

    相关文章

      网友评论

          本文标题:译:Flink---状态

          本文链接:https://www.haomeiwen.com/subject/tsaleqtx.html