回顾
在之前的学习中我们了解到,flink 作为低延时的流式数据处理框架,本身是有状态的。状态 state 是为了保存一些操作符 operator 的中间结果,同时,通过状态可以保证精确一致语义。
State 分类
State 从其实现方式可分为:Keyed State 和 Operator State,从管理方式可分为:Raw State 和 Managed State。
Keyed State
- Keyed State 通常是与 keys 相关的,其函数和操作只能在 KeyedStream 中使用。事实上,keyed 与 hive 中的分区极其类似,每一个 key 只能属于某一个 keyed state。
DataStream<Tuple2<String, Integer>> counts =
text.flatMap(new Tokenizer())
.keyBy(0) // 使用 keyby 方法进行划分,不同的 task 之间不会出现相同的 key
.sum(1);
keyby.png
- 如上,有3个并行度的 WordCount 任务,在 keyby 之后,相同的 key 会被划分到相同的 task 中进行处理。
Operator State
- non-keyed state,每一个 operator state 都仅与一个 operator 的实例绑定。
- 常见的 operator state 是 source state,例如记录当前 source 的 offset
Managed State
Managed State 是由 Flink Runtime 中管理的 State ,并将状态数据转换为 hashtable 或者 RocksDB 的对象进行存储。
mysu_bj.png- ValueState:与 key 对应单个的值,在我们统计流式数据中的单词个数时,事实上,状态就是以 ValueState 存在,每次在状态值上进行更新。在其内部,调用 update(T value) 方法进行状态值的更新。
public interface ValueState<T> extends State {
/**
* Returns the current value for the state. When the state is not
* partitioned the returned value is the same for all inputs in a given
* operator instance. If state partitioning is applied, the value returned
* depends on the current operator input, as the operator maintains an
* independent state for each partition.
*
* <p>If you didn't specify a default value when creating the {@link ValueStateDescriptor}
* this will return {@code null} when to value was previously set using {@link #update(Object)}.
*
* @return The state value corresponding to the current input.
*
* @throws IOException Thrown if the system cannot access the state.
*/
T value() throws IOException;
/**
* Updates the operator state accessible by {@link #value()} to the given
* value. The next time {@link #value()} is called (for the same state
* partition) the returned state will represent the updated value. When a
* partitioned state is updated with null, the state for the current key
* will be removed and the default value is returned on the next access.
*
* @param value The new value for the state.
*
* @throws IOException Thrown if the system cannot access the state.
*/
void update(T value) throws IOException;
}
- ListState:与 key 对应的元素的列表的状态 list,内部定义 update(T)和 addAll(T)两个方法
public interface ListState<T> extends MergingState<T, Iterable<T>> {
/**
* Updates the operator state accessible by {@link #get()} by updating existing values to
* to the given list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* <p>If null or an empty list is passed in, the state value will be null.
*
* @param values The new values for the state.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void update(List<T> values) throws Exception;
/**
* Updates the operator state accessible by {@link #get()} by adding the given values
* to existing list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* <p>If null or an empty list is passed in, the state value remains unchanged.
*
* @param values The new values to be added to the state.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void addAll(List<T> values) throws Exception;
}
- ReducingState:定义与 key 相关的数据元素单个聚合的状态值。
- AggregatingState:定义与 key 相关的数据元素单个聚合的状态值。
Raw State
Raw State 是由算子本身进行管理的 State ,此时状态都是以字节数组的形式保存到 Checkpoint 中,Flink 并不清楚状态数据的内部结构,每次状态的写入和读取都需要算子进行序列化和反序列化。
状态管理
Flink 中状态管理有三种方案:MemoryStateBackend、FSStateBackend、RocksDBStateBackend。
MemoryStateBackend
- MemoryStateBackend 基于内存的状态管理器,它通常将状态数据存储在 JVM 内存中,包括 Key/State 及窗口中缓存的数据。但是由于内存本身的限制,基于内存的状态管理会造成内存溢出。因此,这种状态管理机制通常在本地测试中使用,生产中禁止使用内存状态管理器。
env.setStateBackend(new MemoryStateBackend());
FSStateBackend
- FSStateBackend 基于文件的状态管理,和内存管理机制不同,FSStateBackend 通常把状态数据保存在本地文件系统,或者HDFS文件系统中。在初始化时,需要传入文件路径。基于文件的状态管理机制,适用于状态数据很大的数据,此时,如果使用内存状态管理器,很容易就把内存撑爆。通常情况下,为了保证文件状态安全性,会把文件状态保存在 HDFS 中,此时,借助 HDFS 的多副本的策略,保证文件状态不丢失。
env.setStateBackend(new FsStateBackend(""));
// 源码
public FsStateBackend(Path checkpointDataUri) {
this(checkpointDataUri.toUri());
}
RocksDBStateBackend
- RocksDBStateBackend 基于内存和文件系统的状态管理器,这是基于三方的状态管理器。通常,先把状态放在内存中,等到到达一定的大小时,会将状态数据刷到文件中。
env.setStateBackend(new RocksDBStateBackend(""));
总结
状态是 Flink 容错机制的基石,了解 State 的机制,可以更好的管理 Checkpoint,更好的进行失败任务的恢复。
网友评论