1、键值分区有状态算子
- 只能由作用在KeyedStream上面的函数使用
- 状态原语定义单个键值对应的状态结构
ValueState[T]
用于保存类型为T的单个值
ListState[T]
用于保存类型为T的元素列表,可以调用add、addAll更新元素;可以用get访问元素;使用update更新整个列表
MapState[K, V]
用于保存一组键值映射,有get put remove contains等方法
ReducingState[T]
add方法会直接返回一个ReduceFunction聚合后的值
AggregatingState[I, O]
get方法会直接计算最终结果并返回,也会记录聚合值
所有的状态都支持.clear()清空状态
- 比如下面这个例子可以一直保存计算的状态,状态的初始化需要使用富函数,在open里进行初始化
package BaseOperationTest;
import Bean.SensorReading;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class KeyStateTest {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度 = 1
env.setParallelism(1);
// 从本地socket读取数据
DataStream<String> inputStream = env.readTextFile("/Users/kaiker/Documents/projects/flink_study/src/main/resources/sensor.txt");
// 转换成SensorReading类型
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
});
// 使用自定义map方法,里面使用 我们自定义的Keyed State
DataStream<Integer> resultStream = dataStream
.keyBy(SensorReading::getId)
.map(new MyMapper());
resultStream.print("result");
env.execute();
}
// 自定义map富函数,测试 键控状态
public static class MyMapper extends RichMapFunction<SensorReading,Integer> {
// Exception in thread "main" java.lang.IllegalStateException: The runtime context has not been initialized.
// ValueState<Integer> valueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("my-int", Integer.class));
private ValueState<Integer> valueState;
// 其它类型状态的声明
private ListState<String> myListState;
private MapState<String, Double> myMapState;
private ReducingState<SensorReading> myReducingState;
@Override
public void open(Configuration parameters) throws Exception {
valueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("my-int", Integer.class));
myListState = getRuntimeContext().getListState(new ListStateDescriptor<String>("my-list", String.class));
myMapState = getRuntimeContext().getMapState(new MapStateDescriptor<String, Double>("my-map", String.class, Double.class));
// myReducingState = getRuntimeContext().getReducingState(new ReducingStateDescriptor<SensorReading>())
}
// 这里就简单的统计每个 传感器的 信息数量
@Override
public Integer map(SensorReading value) throws Exception {
// 其它状态API调用
// list state
for(String str: myListState.get()){
System.out.println(value.getId() + " " +str);
}
myListState.add("hello");
// map state
// myMapState.get("1");
// myMapState.put("2", 12.3);
// myMapState.remove("2");
// reducing state
// myReducingState.add(value);
// myMapState.clear();
Integer count = valueState.value();
// 第一次获取是null,需要判断
count = count==null?0:count;
++count;
valueState.update(count);
return count;
}
}
}
2、全局的状态
ListCheckpointed
可以实现列表型的算子状态,需要实现ListCheckpointed接口
- snapshotState会在触发有状态函数生成检查点时调用,接受检查点编号和JobManager开始创建检查点的机器时间timestamp
- restoreState()方法会在初始化函数状态调用,接收一个状态对象列表,基于这些对象恢复算子状态进行恢复
public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer> {
// 定义一个本地变量,作为算子状态
private Integer count = 0;
@Override
public Integer map(SensorReading value) throws Exception {
count++;
return count;
}
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(count);
}
@Override
public void restoreState(List<Integer> state) throws Exception {
for (Integer num : state) {
count += num;
}
}
}
CheckpointedFunction
用于制定有状态函数的最底层借口。
- initializeState() 触发时机是应用启动或由于故障而重启任务,传入一个FunctionInitializationContext对象,它可以访问OperatorStateStore和KeyedStateStore对象,这两个状态存储对象能够使用Flink运行时来注册函数状态并返回状态对象。
- snapshotState() 方法会在生成检查点之前调用,接收FunctionSnapshotContext对象,可以获取检查点编号以及JobManager初始化检查点时间戳。该方法是确保检查点开始之前,所有状态对象都已更新完毕。
3、有状态应用的性能及鲁棒性
选择状态后端
- MemoryStateBackend将状态以常规对象的方式存储在TaskManager进程的JVM堆里,生成检查点时,会将状态发送至JobManager并保存到它的堆内存中。建议近用于开发和测试。
- FsStateBackend也将本地状态保存在TaskManager的JVM堆内。但创建检查点时状态会写入远程持久化文件系统。
- RocksDBStateBackend会把全部状态存到本地RocksDB实例中。RocksDB是一个嵌入式键值存储,可以将数据保存到本地磁盘上。
选择状态原语
状态原语就是ValueState、ListState、MapState之类。不同的状态后端处理状态原语的时候可能会有序列化开销。
可查询式状态
任何键值分区的状态都可以作为可查询式状态暴露给外部应用,QueryableStateServer用于处理客户端代理的请求,运行在TM上,会根据查询的状态返回给客户端代理。QueryableStateClientProxy用于接收并相应客户端请求。
可查询式状态服务架构
- 可以在创建一个状态之后,调用.setQueryable使得这个状态可查询。
- 也可以使用asQueryableState()创建,对于每个收到的记录可查询式状态的数据汇都会用它去更新状态值。
dataStream.map(new MyCountMapper())
.keyBy(.1)
.timeWindow(Time.seconds(10))
.max(1)
.asQueryableState("maxTemperature")
网友评论