Flink 1.7
Google翻译
键控状态和操作状态
Flink中有两种基本的状态类型:键控状态和运算符状态
键控状态
键控状态总是与键相关并且只能在键控流的函数和运算符中使用
你可以将Keyed State视为已分区或分片的操作符状态,每个键只有一个状态分区。每个键控状态在逻辑上绑定到<parallel-operator-instance,key>的唯一复合,并且由于每个键“属于”键控运算符的一个并行实例,我们可以将其简单地视为<operator,key >
键控状态进一步组织成所谓的键控组。键控组是Flink可以重新分配密钥状态的原子单元;键控组与定义的最大并行度完全一样多。在执行期间,键控运算符的每个并行实例都使用一个或多个键控组的键。
运算符状态
使用运算符状态(或非键控状态), 每个运算符状态绑定到一个运算符实例。Kafka连接器在Flink中是一个使用运算符状态的好例子。任一个Kafka消费者并行实例都包含一个主题与偏移映射作为运算符状态。
运算符状态接口支持当并行度改变时通过并行操作符重新分配实例。进行重新分配有多种不同的方案
原生及托管状态
键控状态及运算符状态以两种方式存在:托管状态及原生状态
托管状态由Flink运行时控制的数据结构表示,例如内部的hash表,或者RocksDB.例如“ValueState”,"ListState"等。Flink将state编码并写入checkpoints中
原生状态是把运算符保存在自己数据结构中的状态。当状态检查时,它们仅将字节序列写入到checkpoint。Flink不感知状态的数据结构,它只能看到原生的字节数据
所有的datastream函数都可以使用托管状态,但是原生状态仅能在实现运算符时使用。建议使用托管状态而非原生状态,因为在托管状态下,当并行度改变时Flink可以自动重新分配状态,也可以更好的做内存管理。
注意:如果你的托管状态需要自定义序列化逻辑,请查看相关引导以保证未来的兼容性。Flink的默认序列化不需要特殊对待
使用托管监控状态
监控托管转台接口提供不同的状态类型的访问,这些状态都限定为当前输入元素的键。这意味着这些状态类型只能在监控流中使用,监控流使用stream.keyBy()来创建
现在,我们首先来看一下可用的不同状态类型,之后我们可以看到如何在程序中使用它们。可用的原生状态:
- ValueState<T>: 它保留了可以更新和检索的值(如上所述,作用于输入元素的键的范围,因此操作看到的每个键可能有一个值)使用update(T)更新值,使用T value()函数检索
- ListState<T>: 这保留了元素列表。您可以追加元素并在所有当前存储的元素上检索Iterable。使用add(T)或addAll(List <T>)添加元素,可以使用Iterable <T> get()检索Iterable。您还可以使用update覆盖现有列表(List <T>)
- ReducingState<T>: 这保留了一个值,表示添加到状态的所有值的聚合。该接口类似于ListState,但使用add(T)添加的元素使用指定的ReduceFunction缩减为聚合
- AggregatingState<IN, OUT>: 这保留了一个值,表示添加到状态的所有值的聚合。与ReducingState相反,聚合类型可能与添加到状态的元素类型不同。接口与ListState相同,但使用add(IN)添加的元素使用指定的AggregateFunction进行聚合
- FoldingState<T, ACC>: 这保留了一个值,表示添加到状态的所有值的聚合。与ReducingState相反,聚合类型可能与添加到状态的元素类型不同。该接口类似于ListState,但使用add(T)添加的元素使用指定的FoldFunction折叠为聚合
- MapState<UK, UV>: 这保留了映射列表。您可以将键值对放入状态,并在所有当前存储的映射上检索Iterable。使用put(UK,UV)或putAll(Map <UK,UV>)添加映射。可以使用get(UK)检索与用户密钥关联的值。可以分别使用entries(),keys()和values()来检索映射,键和值的可迭代视图
所有类型的状态还有一个方法clear(),它清除当前活动键的状态,即输入元素的键
注意: FoldingState和FoldingStateDescriptor在Flink 1.4中废弃了并且将在未来完全删除。请使用AggregatingState和AggregatingStateDescriptor
重要的是要记住,这些状态对象仅用于与状态接口。状态不一定存储在内部,但可能驻留在磁盘或其他位置。要记住的第二件事是,从状态获得的值取决于input元素的键。因此,如果涉及的键不同,则在一次调用用户函数时获得的值可能与另一次调用中的值不同
要获取状态句柄,您必须创建StateDescriptor。这保存了状态的名称(正如我们稍后将看到的,您可以创建多个状态,并且它们必须具有唯一的名称以便您可以引用它们),状态所持有的值的类型,并且可能是用户 - 指定的函数,例如ReduceFunction。根据要检索的状态类型,创建ValueStateDescriptor,ListStateDescriptor,ReducingStateDescriptor,FoldingStateDescriptor或MapStateDescriptor
使用RuntimeContext访问状态,因此只能在丰富的函数中使用。请参阅此处了解相关信息,但我们很快也会看到一个示例。 RichFunction中可用的RuntimeContext具有这些访问状态的方法:
- ValueState<T> getState(ValueStateDescriptor<T>)
- ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
- ListState<T> getListState(ListStateDescriptor<T>)
- AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
- FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
- MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
这是一个示例FlatMapFunction,显示所有部件如何组合在一起:
public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
/**
* The ValueState handle. The first field is the count, the second field a running sum.
*/
private transient ValueState<Tuple2<Long, Long>> sum;
@Override
public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception {
// access the state value
Tuple2<Long, Long> currentSum = sum.value();
// update the count
currentSum.f0 += 1;
// add the second field of the input value
currentSum.f1 += input.f1;
// update the state
sum.update(currentSum);
// if the count reaches 2, emit the average and clear the state
if (currentSum.f0 >= 2) {
out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0));
sum.clear();
}
}
@Override
public void open(Configuration config) {
ValueStateDescriptor<Tuple2<Long, Long>> descriptor =
new ValueStateDescriptor<>(
"average", // the state name
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), // type information
Tuple2.of(0L, 0L)); // default value of the state, if nothing was set
sum = getRuntimeContext().getState(descriptor);
}
}
// this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L))
.keyBy(0)
.flatMap(new CountWindowAverage())
.print();
// the printed output will be (1,4) and (1,5)
这个例子实现了一个穷人的计数窗口。我们通过第一个字段键入元组(在示例中都具有相同的键1)。该函数将计数和运行总和存储在ValueState中。一旦计数达到2,它将发出平均值并清除状态,以便我们从0开始。注意,如果我们在第一个字段中具有不同值的元组,这将为每个不同的输入键保持不同的状态值
TTL状态
可以将生存时间(TTL)分配给任何类型的键控状态。如果配置了TTL并且状态值已过期,则将尽力清除存储的值,这将在下面更详细地讨论。
所有状态集合类型都支持每个条目的TTL。这意味着列表元素和映射条目将独立过期。
为了使用状态TTL,必须首先构建StateTtlConfig配置对象。然后,可以通过传递配置在任何状态描述符中启用TTL功能
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("text state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
它的配置项有一下几点需要考虑:
第一个参数newBuilder是强制性的,指定value有效期
更新类型配置为TTL刷新的时机(默认是OnCreateAndWrite)
- StateTtlConfig.UpdateType.OnCreateAndWrite - only on creation and write access
- StateTtlConfig.UpdateType.OnReadAndWrite - also on read access
状态可见性配置如果过期值尚未清除,是否在读取访问时返回它(默认情况下NeverReturnExpired)
- StateTtlConfig.StateVisibility.NeverReturnExpired - 从不返回
- StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp - 没有被清除时返回
在NeverReturnExpired的情况下,过期状态表现得好像它不再存在,即使它仍然必须被删除。
该选项对于在TTL之后必须严格用于读取访问的数据的用例是有用的,例如,应用程序使用隐私敏感数据
另一个选项ReturnExpiredIfNotCleanedUp允许返回过期状态当它没有被清除时
注意:
- 状态后端存储上次修改的时间戳以及用户值,这意味着启用此功能会增加状态存储的消耗。
堆状态后端存储一个额外的Java对象,其中包含对用户状态对象的引用和内存中的原始long值。
RocksDB状态后端为每个存储值,列表条目或映射条目添加8个字节。 - 目前仅支持参考处理时间的TTL
- 尝试恢复先前未配置TTL的状态,使用启用TTL的描述符(反之亦然)将导致兼容性失败和StateMigrationException
- TTL配置不是检查点或保存点的一部分,而是Flink如何在当前运行的作业中处理它的方式。
- 仅当用户值序列化程序可以处理空值时,具有TTL的映射状态当前才支持空用户值。如果序列化程序不支持空值,
它可以用NullableSerializer包装,代价是序列化形式的额外字节
清除过期状态
当前,仅在过期值独立读取时才可以移除,比如调用ValueState.value()
注意:这意味着默认情况下,如果未读取过期状态,则不会将其删除,从而可能导致状态不断增长。这可能在将来的版本中发生变化
此外,您可以在获取完整状态快照时激活清理,这将减小其大小。在当前实现下不清除本地状态,但是在从先前快照恢复的情况下它
不会包括已移除的过期状态。它可以在StateTtlConfig中配置
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot()
.build();
该操作不支持在RocksDB状态后端中增量的checkpointing,更多的在后台自动清理过期状态的策略将在未来加入
使用托管运算符状态
要使用托管操作符状态,有状态函数可以实现更通用的CheckpointedFunction接口,或者ListCheckpointed <T extends Serializable>接口
CheckpointedFunction
CheckpointedFunction 接口提供对具有不同重新分发方案的非键控状态的访问,它需要实现以下两个方法
void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;
每当必须执行检查点时,都会调用snapshotState()。每次初始化用户定义的函数时,都会调用对应的initializeState(),即首次初始化函数时,或者当函数实际从早期检查点恢复时。鉴于此,initializeState()不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑的位置
目前,支持列表样式的托管操作符状态。该状态应该是一个可序列化对象的列表,彼此独立,因此有资格在重新缩放时重新分配。换句话说,这些对象是可以重新分配非键控状态的最精细的粒度。根据状态访问方法,定义了以下重新分发方案
- Even-split redistribution: 每个运算符都返回一个状态元素列表。整个状态在逻辑上是所有列表的串联。在恢复/重新分发时,列表被平均分成与并行运算符一样多的子列表。每个运算符都获得一个子列表,该子列表可以为空,或包含一个或多个元素。例如,如果使用并行性1,运算符的检查点状态包含元素element1和element2,当将并行性增加到2时,element1可能最终在运算符实例0中,而element2将转到运算符实例1
- Union redistribution: 每个运算符都返回一个状态元素列表。整个状态在逻辑上是所有列表的串联。在恢复/重新分配时,每个运算符都会获得完整的状态元素列表
下面是一个有状态SinkFunction的示例,它使用CheckpointedFunction缓冲元素,然后再将它们发送到外部世界。它演示了基本的 even-split再分配列表状态
public class BufferingSink
implements SinkFunction<Tuple2<String, Integer>>,
CheckpointedFunction {
private final int threshold;
private transient ListState<Tuple2<String, Integer>> checkpointedState;
private List<Tuple2<String, Integer>> bufferedElements;
public BufferingSink(int threshold) {
this.threshold = threshold;
this.bufferedElements = new ArrayList<>();
}
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
bufferedElements.add(value);
if (bufferedElements.size() == threshold) {
for (Tuple2<String, Integer> element: bufferedElements) {
// send it to the sink
}
bufferedElements.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
checkpointedState.clear();
for (Tuple2<String, Integer> element : bufferedElements) {
checkpointedState.add(element);
}
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
if (context.isRestored()) {
for (Tuple2<String, Integer> element : checkpointedState.get()) {
bufferedElements.add(element);
}
}
}
}
initializeState方法将FunctionInitializationContext作为参数。这用于初始化非键控状态“容器”。
这些是ListState类型的容器,其中非键控状态对象将在检查点存储
注意状态是如何初始化的,类似于键控状态,StateDescriptor包含状态名称和有关状态所持有的值类型的信息
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
状态访问方法的命名约定包含其重新分发模式,后跟其状态结构.例如,要在还原时使用联合重新分发方案的列表状态,请使用getUnionListState(descriptor)访问该状态。如果方法名称不包含重新分发模式,例如getListState(描述符),它只是意味着将使用基本的even-split再分配方案
在初始化容器之后,我们使用上下文的isRestored()方法来检查我们是否在失败后恢复。如果这是真的,即我们正在恢复,则应用恢复逻辑
如修改后的BufferingSink的代码所示,在状态初始化期间恢复的ListState保存在类变量中以供将来在snapshotState()中使用。在那里,ListState被清除前一个检查点包含的所有对象,然后填充我们想要检查点的新对象。
作为旁注,键控状态也可以在initializeState()方法中初始化。这可以使用FunctionInitializationContext提供的方式完成
ListCheckpointed
ListCheckpointed接口是CheckpointedFunction的一个更有限的变体,它仅支持在恢复时具有even-split再分配方案的列表样式状态。
它还需要实现两种方法
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;
在snapshotState()上,运算符应该返回检查点的对象列表,restoreState必须在恢复时处理这样的列表。如果状态不可重新分区,
则始终可以在snapshotState()中返回Collections.singletonList(MY_STATE)
有状态源函数
与其他运算符相比,有状态的源需要更多的关注。为了使状态和输出集合的更新成为原子(在故障/恢复时exactly-once所需),
用户需要从源的上下文获取锁定。
public static class CounterSource
extends RichParallelSourceFunction<Long>
implements ListCheckpointed<Long> {
/** current offset for exactly once semantics */
private Long offset;
/** flag for job cancellation */
private volatile boolean isRunning = true;
@Override
public void run(SourceContext<Long> ctx) {
final Object lock = ctx.getCheckpointLock();
while (isRunning) {
// output and state update are atomic
synchronized (lock) {
ctx.collect(offset);
offset += 1;
}
}
}
@Override
public void cancel() {
isRunning = false;
}
@Override
public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) {
return Collections.singletonList(offset);
}
@Override
public void restoreState(List<Long> state) {
for (Long s : state)
offset = s;
}
}
当Flink完全确认检查点与外界通信时,某些运算符可能需要这些信息,在这种情况下,
请参阅org.apache.flink.runtime.state.CheckpointListener接口
网友评论