有状态计算
无状态计算实现的复杂度相对较低,实现起来较容易,但是无法完成提交的比较复杂的业务场景。
- 用户想要实现cep(复杂事件处理),获取符合某一特定事件规则的事件,状态计算就可以将接入事件进行存储,然后等待符合规则的事件触发;
- 用户想要按分钟,小时,天进行聚合运算,求取当前最大值,均值等聚合指标,这就需要利用状态维护当前计算过程中产生的结果,例如事件总数,总和,以及最大,最小值;
- 用户想在stream上实现机器学习的模型训练,状态计算可以帮助用户维护当前版本模型使用的参数;
- 用户想要使用历史数据进行计算,状态计算可以帮助用户对数据进行缓存,使用户可以直接从状态中获取相应的历史数据。
引入状态计算这一特性能够极大地提升流式计算过程中数据的使用范围以及指标计算的复杂度,而不再借助类似于Redis外部缓存存储中间结果数据,这种方式需要频繁地和外部系统交互,并造成大量系统性能开销。
flink 状态类型及应用
状态类型
- keyed state
表示和key相关的一种state,只能用于keyedstream类型数据集对应的Functions和Operators之上。keyed State是 Operator State的特例,区别在于Keyed State事先按照key对数据集进行分区,每个Key State 仅对应一个Operator和key的组合。keyed state可以通过key groups进行管理,主要用于当前算子并行度发生变化时,自动重新分布keyed state数据。在系统运行过程中,一个keyed算子实例可能运行一个或者多个keygroups的keys - Operator State
与Keyed State 不同的是,Operator State只和并行的算子实例绑定,和数据元素中的key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator支持当前算子实例并行度发生变化时自动重新分配状态数据。 - 状态数据存储的形式
1.托管状态(managed state),由flink runtime中控制和管理状态数据,并将状态数据转换成为内存hash tables或rocksdb的对象存储,然后将这些状态数据通过内部的接口持久化到checkpoints中。
2.原生状态(Raw State)形式,由算子自己再反序列化出状态的数据结构。
flink推荐用户使用Managed State管理状态数据,主要原因是Managed State能够更好地支持状态数据的重平衡以及更加完善的内存管理。
managed keyed state
- ValueState[T]:与key对应单个值得状态,例如统计user_id对应的交易次数,每次用户交易都会在count状态值上进行更新。ValueState对应的更新方法是update(T),取值方法是T value();
- ListState[T]:与key对应元素列表的状态,状态中存放元素的list列表例如定义listState存储用户经常访问的ip地址。在listState中添加元素使用add(T)或者addAll(List[T])两个方法,获取元素使用Iterable<T> get()方法,更新元素使用update(List[T])方法;
- ReducingState[T]:定义与key相关的数据元素单个聚合值得状态,用于存储经过指定ReduceFunction计算之后的指标,因此,ReducingState需要指定ReduceFunction完成状态数据的聚合。有add(T)和get()方法。
- AggregatingState[IN,OUT]:定义与key对应的元素单个聚合值得状态,用于维护数据元素经过指定AggregateFunction计算之后的指标。和ReducingState相比,Agg输入类型和输出类型不一定是相同的,但ReducingState输入和输出必须是相同类型的。in是add(in)方法,获取元素使用OUT get()方法。
- MapState[UK,UV]:定义与key对应键值对额状态,用于维护具有kv结构的状态数据,mapstate添加元素使用put(uk,uv)或者putALL(Map[uk,uv])方法,获取元素使用get(uk)方法。和hashMap接口类似,MapState也可以通过entries(),keys(),values()获取对应的keys或values的集合
Flink中需要通过创建StateDescriptor来获取相应state的操作类。StateDescriptor主要定义状态的名称,状态中数据的类型参数信息以及状态自定义函数。每种Managed Keyed有相应的StateDescriptor,例如valuestateDesciptor
(1) Stateful Function定义
通过完整的实例来说明如何在RichFlatmapFunction中使用ValueState,完成对接入数据最小值的回去。通过定义leastValueState存储系统中指标的最小值,并在每次计算时和当前接入的数据对比,如果当前元素的最小的数值小于状态中的最小值,则更新状态。然后在输出操作中增加对应指标的最小值作为新的数据集的字段。
val env = StreamExecutionEnvironment.getExecutionEnvironment
//创建元素数据集
val inputStream = env.fromElements((2,21L),(4,1L),(5,4L))
inputStream.keyBy(_._1).flatMap{
//定义和创建RichFlatMapFunction,第一个参数为输入数据类型,第二个参数为输出数据类型
new RichFlatMapFuntion[(Int,Long),(Int,Long,Long)]{
private var leastValueState = _
override def open(parameters:Configuration):Unit = {
//创建ValueStateDescriptor,定义状态名称为leastValue,并指定数据类型
val leastValueStateDescriptor = new ValueStateDescriptor[Long][]("leastValue",classOf[Long])
//通过getRuntionContext.getState获取state
leastValueState = getRuntimeContext.getState(leastValueStateDescriptor)
}
override def flatMap(t:(Int,Long),collector:Collector[(Int,Long,Long)]):Unit = {
//通过value方法从leastValueState中获取最小值
val leastValue = leastValueState.value()
//如果当前指标大于最小值,则直接输出数据元素和最小值
if(t._2>leastValue){
collector.collect((t._1,t._2,leastValue))
}else{
//如果当前指标小于最小值,则更新状态的最小值
leastValueState.update(t._2)
//将当前数据中的指标作为最小值输出
collector.collect((t._1,t._2,t._2))
}}}}
(2) State生命周期
对于任何类型Keyed State都可以设定状态的生命周期(TTL),以确保能够在规定时间内及时地清理状态数据。状态生命周期功能可以通过StateTtlConfig配置,然后将StateTtlConfig配置传入StateDescriptor中的enableTimeToLive方法中即可。
(3)Scala版本直接使用状态
在KeyedStream接口中提供了filterWithState ,mapWithState,flatmapWithState三种方法来定义和操作状态数据。
网友评论