无状态算子:map/flatmap/filter等简单算子
有状态算子:window,reduce等操作,需要依赖之前的计算结果:newState = oldState + anEvent
State与特定算子关联在一起,算子需要注册状态
State分类:
1.Operator State:状态作用于当前算子任务,并行子任务之间state独立。
非keyed的state snapshots很少用到,如需使用,需要在算子实现类上实现ListCheckpointed(旧版本)或者CheckpointedFunction(新版本)
2.Keyed State:即分区状态,根据数据中的key来维护访问,相同的key会到同一个tasks子任务分区中,每个key一个状态。
(ValueState/ListState/MapState/ReducingState AggState)
Keyed State 只能在 RichFuction 中使用,RichFuction 与普通、传统的 Function 相比,最大的不同就是它有自己的生命周期。
- 声明状态:
ValueStateDescriptor<Boolean> flagDescriptor=new ValueStateDescriptor<Boolean>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
- 读取/更新:
flagState.value()
flagState.update(true);
State使用示例:
StreamExecutionEnvironment sEnv = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Tuple2<String, Long>> ds = sEnv.fromCollection(Arrays.asList(
new Tuple2("a", 1623588192345L),
new Tuple2("b", 1623588193345L),
new Tuple2("a", 1623588194345L),
new Tuple2("b", 1623588195345L),
new Tuple2("a", 1623588196345L),
new Tuple2("b", 1623588197345L),
new Tuple2("a", 1623588198345L),
new Tuple2("b", 1623588199345L)
));
ds.keyBy(value -> value.f0).map(new RichMapFunction<Tuple2<String, Long>, Long>() {
//1. 声明状态
private transient ValueState<Long> count;
//2.初始化状态
@Override
public void open(Configuration parameters) throws Exception {
count = getRuntimeContext().getState(new ValueStateDescriptor<>("count", Long.class));
}
@Override
public Long map(Tuple2<String, Long> value) throws Exception {
Long currentCount = count.value()==null?0L: count.value();
currentCount++;
//3.赋值
count.update(currentCount);
return currentCount;
}
}).print();
sEnv.execute("1");
状态后端(State Back):
每个任务都会本地维护状态,内存中。StateBack负责本地的状态存储访问和维护,以及快照检查点(checkpoint)状态写入远程存储。
- MemoryStateBackend:本地状存储在TaskManager的JVM堆上,而checkpoint存储在JobManager内存中。快但是不稳定。
- FsStateBackend:本地状态存储在TaskManager的JVM堆上,checkpoint存储在远程文件系统HDFS上。
- RocksDBStateBackend:将所有状态序列化存入本地的RocksDB中存储,本地保留缓存。
网友评论