1.前言
Flink一直是被成为了有状态的流计算,并且还能够通过对状态的合理的调控与配置,来实现端到端的状态一致性,实现数据的精准一次性传输。这是非常厉害的一个功能,我在前面也说了,Flink是如何完成状态的保存的,也表明了要在这一篇文章当中从头的聊一聊状态编程到底是什么,都有什么类型的状态,在使用时到底该如何去使用。
2.什么是状态
我一直都很不喜欢书本上那种非常教条的表述方式,因为它看起来总让我觉得缺点什么,在我的思维中,如果想要表名一件事情,如果能在说概念之前拿一个现实生活中的例子去带入效果会好很多。
例子:我们可以试想一下,Flink是一个数据计算框架,用来处理流入到它内部的所有数据。那如果把我们自己当作是Flink,而我们每一天的生活当作流入Flink中的数据是不是也可行呢?我们每天经历的不同种生活,可以认为是一条条的数据流,我们需要对它们进行处理才能够完成我们每天的正常生活。接下来我就用这个角色来代入Flink,去讲讲状态对于Flink到底是个什么东西。
比如我今天早晨要去上班,上班的路上需要做地铁。如果我的脑子里面没有到公司的具体路线,我根本不会知道要坐上哪一条现路才能到达公司,这就相当于我无法完成从家到公司的这个程序任务。但是如果我知道这条路线的话,我就可以很顺利的到达我的工位,开始进行当天的工作。这也就表明了我完成了从家到公司的这个程序任务,而我在从家到公司的路上所使用到的这条路线,就是“状态”!!!
所以,在Flink计算的过程中,有一些任务需要依赖一些其他的数据内容才能够计算出正确的结果,这些被依赖的数据实际上就是Flink中的状态。
3.状态的分类
在Flink处理数据的过程中,有的时候需要依赖其他的数据才能够获得结果,有的不需要依赖其他的数据就能获得到计算结果。那这些不需要依赖其他数据就能够得到结果的算子就叫无状态算子,反之则是有状态的算子。
3.1 托管状态和原始状态
上面说的有状态算子和无状态算子只是针对于不同的计算场景而划分的,并没有固定且具体的无状态或有状态算子。但是状态的类型被官方进行了划分,分为托管状态和原始状态,原始状态是需要我们自己定义的,注意这个自己定义相当复杂,无论是状态的存储访问权限、故障恢复之类的东西都需要开发人员自己动手,这需要相当强的功力,我本人肯定是不会的,所以也没有办法讲给大家。
托管状态就是Flink内部为我们实现的、已经给予了一系类完整实现的完整功能。也是我本人经常用到的一种状态类型,在内部还能够按照数据流是否按key进行分区而划分成为了算子状态和键控状态。
3.1.1 算子状态
在Flink计算过程中,会按照设定好的并行度对计算任务进行并行处理,每一个并行度就是一个并行子任务。算子状态所覆盖的范围就是每一个并行子任务,这一个并行度中的所有计算流程都能够访问到相同的状态。
3.1.2 按键分区状态(键控状态)
这种状态类型就是非常常见的一种状态了,当数据被keyby传入的键值选择器处理后,数据会按照不同的键值流入到不同的逻辑分区中,每一个逻辑分区中的键都是一样的,键控状态也是按照这个key来进行维护的,也就是说一个key对应了一组状态,不同key之间的状态是无法读取的。
3.2 键控状态与算子状态
3.2.1 键控状态隔离状态方式
键控状态针对于状态的隔离方式,是通过把当前key与属于这个key中的数据绑定成为一个map形式,这样就能够保证后来的数据可以通过这个map来找到自己所对应的状态内容了。
3.2.2 算子状态隔离状态方式
算子状态与键控状态相比,是一种比较底层的状态,因为所有不需要考虑key的算子一旦使用了状态,都可以被称为算子状态。并且Flink的故障恢复也是靠着状态来完成的。但是由于算子状态应用的场景比较少,所以名气不如键控状态大。在数据处理的过程中,只要是这条数据被发送到了一个并行子任务,被一个算子处理的时候,处理过程中一旦使用状态,那这个状态就会被锁定到这个算子身上。因此,无论key是否一样,只要是被发送到了统一并行子任务,那么所有的数据都能够共享各个算子各自的状态。
4.状态类型
4.1 算子状态类型
算子状态类型支持三种不同的类型结构:ListState、UnionListState、BroadcastState,咱么就按照这三种状态类型分别进行阐述。
4.1.1 ListState
在算子状态当中,会按照不同的并行子任务来维护一个ListState,这个并行任务上的所有状态项都会被放到这个ListState当中,每一个状态项自然就是List中的一个最小元素。如果发生了并行度改变的情况,就会把所有并行度上的每一个状态项都收集起来,形成一个拥有当前任务的所有并行子任务的所有状态的一个大List,然后按照重新规划好的并行度进行轮询分配。
4.1.2 UnionListState
这个状态是ListState的升级版本,它与ListState之间的区别就在于在发生并行度变化的时候,并不是形成一张大List进行轮询分配,而是将这个大List以广播的形式发送给更新并行度之后的所有子任务,让它们按照自己的需要自行拿取和保留。
4.1.3 BroadcastState
广播状态是一个非常有用的状态,它能够将广播流中的数据发送给每一个并行子任务。所以在使用过程中,通过把规则数据按照广播变量的方式提供给每一个并行子任务,来实现符合规则内容的处理。
4.2 键控状态类型
键控状态的类型与算子状态的结构类型要复杂一些,数量上也会多一些,具体可以分为
ValueState、ListState、MapState、reduceingState、AggregateState
4.2.1 ValueState
值状态是算子状态中最基本的一个,根据名称我们能够得知,值状态就是以一个值来设置状态的。举个例子,如果你想得出一个学校每个班级身高超过180cm的学生信息,就可以把这个180cm设置成为值状态,然后通过计算得出想要的结果。
4.2.2 ListState
列表状态实际上就是多个值状态组合而成。举个例子:如果你想得出身高为180com、170cm、160cm这三种身高的学生信息,那就把这三种身高组成列表状态。然后用这个列表状态去处理数据。
4.2.3 MapState
映射状态在我看起来是比较灵活的一种状态,它的状态都是一个个的kv键值对。举个例子:如果你想得到所有身高为180、体重为160的学生信息,就能够通过这个状态类型来计算并得到。
4.2.4 reduceingState
归约状态有点类似于值状态,只不过这种状态类型是对每条数据的归约结果。值得注意的是,归约状态的数据类型要与传入的参数是一致的。
4.2.5 AggregateState
聚合状态和归约状态没差多少,只不过是能够让聚合的状态与传入的数据类型可以不一样。
4.3 广播状态
广播状态在底层是一个以kv形式存在的状态类型,因此如果想创建广播变量,需要传入一个MapState状态描述去完成创建。你只要记住,如果你在做计算的时候,想要用依赖一些少量的数据做规则匹配,那么选择广播状态基本上是不会错的。
4.4 状态生存时间
有的时候,随着计算时间的不断增长,计算时所使用的状态也就会越来越大。大量的状态会占据很大的内存空间,影响集群的性能。因此需要规定出一个状态的生存时间的概念,一旦到达时间就会将过期的状态擦除、或者进行状态无效化。具体的使用方法、步骤如下:
1.创建一个StateTtlConfig配置对象,并传入对应的参数
2.调用状态描述器的enableTimeToLive方法,将第一步骤的对象传入。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(10)) //传入的是状态的生存时间
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//定义什么时候触发失效时间的判断
//OnCreateAndWrite(默认)创建和写入的时候触发 OnReadAndWrite无论读写都会创建
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //定义失效状态的可见性
//NeverReturnExpired 永远不返回过期状态 ReturnExpireDefNotCleanedUp如果过去状态还没被系统清理,就还能返回
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("mystate", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
5.状态的具体使用
5.1 广播状态具体使用
1.创建状态描述器
2.将数据流声明为广播状态
3.广播流与常规流连接
4.对连接后的流做计算
//1.创建状态描述器
MapStateDescriptor<String,String> resultDescripter = new MapStateDecriptorM<>("ruler-name",String.class,String.class);
//2.将数据流声明为广播状态
BroadcastStream<Event> ruleBroadcastStram = dataStream.broadcast(resultDescripter);
//3.广播流与常规流连接 4.对连接后的流做计算
stream.connect(ruleBroadcastStram).process(new KeyedBroadcastProcessFunction()) --针对监控流
stream.connect(ruleBroadcastStram).process(new BroadcastProcessFunction()) --针对常规流
在连接流里面还有两个要实现的方法,分别为processElement和processBroadcastElement;后者是对广播流中的数据进行处理的,前者则是定义如何根据广播流里面的状态数据做对应的处理计算的。因此processElement无法获得可以修改的状态数据内容。
5.2键控状态
键控状态的集种状态的使用方法基本一样,都是要先传入对应的状态描述器,然后在对状态内容进行获取,然后按照不同类型的状态做对应的计算。这部分内容我会放到后面的实战系列去详细的写,有需求看起来也会更加方便一些。
网友评论