#parser.add_argument('content', type=dict)
args = parser.parse_args()
res = convert_bool(args['only_brief']or False)
print("-----------")
print(res)
print(args['content'])
print(type(args['content']))
content = convert_dict(args['content']or {})
print("-----------")
print(content)
return form_result({})
水位线
是数据流中插入的一个标记,用来表示事件时间的进展,它会随着数据一起在任务间传递。
可以实现WatermarkStrategy接口自定义生成水位线
为数据流中的元素分配时间戳并生成水印以表示事件时间进度。 给定的 WatermarkStrategy 用于创建 TimestampAssigner 和 WatermarkGenerator。对于数据流中的每个事件,调用 TimestampAssigner.extractTimestamp(Object, long) 方法来分配事件时间戳。对于数据流中的每个事件,将调用 WatermarkGenerator.onEvent(Object, long, WatermarkOutput)。定期(由 ExecutionConfig.getAutoWatermarkInterval() 定义)将调用 WatermarkGenerator.onPeriodicEmit(WatermarkOutput) 方法来发射水位线。也可以不在onPeriodicEmit里面,在onEvent中判断满足某一条件才发射。
常见的水印生成模式可以作为 WatermarkStrategy 类中的静态方法找到。
flink内置:
forMonotonousTimestamps有序流的水位线
forBoundedOutOfOrderness乱序流的水位线
WatermarkStrategy.forMonotonousTimestamps() ==WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))
自定义时还可以指定周期性生成或者是断点式生成
周期性生成比如200ms。断点式生成在onevent中满足一定条件才发射水位线
多个并行子任务时,下游可能会收到多个上游发来的水位线,木桶原理,小的为准。因为水位线的本质是当前时间之前的数据,都已经到齐了。
窗口
按键分区和非按键分区数据流
区别在于调用窗口算子之前是否有keyby操作。
经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),就是 KeyedStream。基于KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算。
如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll 本身就是一个非并行的操作
达到窗口关闭时间已经触发计算然后销毁了(窗口默认被销毁),所以无法再进入到窗口中,自然也就无法更新计算结果了
增量聚合函数
ReduceFunction和AggregateFunction
.keyBy(r -> r.f0)
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1,
Tuple2<String, Long> value2) throws Exception {
// 定义累加规则,窗口闭合时,向下游发送累加结果
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
})
ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使我们必须在聚合前,先将数据转换(map)成预期结果类型;
AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型。
createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
⚫ add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法。
⚫ getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用。
⚫ merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)。
全窗口函数
窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算;
ProcessWindowFunction
增量聚合函数处理计算会更高效。举一个最简单的例子,对一组数据求和。大量的数据连续不断到来,全窗口函数只是把它们收集缓存起来,并没有处理;到了窗口要关闭、输出结果的时候,再遍历所有数据依次叠加,得到最终结果。而如果我们采用增量聚合的方式,那么只需要保存一个当前和的状态,每个数据到来时就会做一次加法,更新状态;到了要输出结果的时候,只要将当前状态直接拿出来就可以了。增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。它只负责收集数据、提供上下文相关信息,把所有的原材料都准备好,至于拿来做什么我们完全可以任意发挥。这就使得窗口计算更加灵活,功能更加强大。
也可以增量聚合和全窗口函数的结合起来使用
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。
窗口处理的主体还是增量聚合,而引入全窗口函数又可以获取到更多的信息包装输出,这样的结合兼具了两种窗口函数的优势,在保证处理性能和实时性的同时支持了更加丰富的应用场景。
Trigger
Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法;
⚫ onElement():窗口中每到来一个元素,都会调用这个方法。
⚫ onEventTime():当注册的事件时间定时器触发时,将调用这个方法。
⚫ onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法。
⚫ clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态。
前三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型。
⚫ CONTINUE(继续):什么都不做
⚫ FIRE(触发):触发计算,输出结果
⚫ PURGE(清除):清空窗口中的所有数据,销毁窗口
⚫ FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口
移除器(Evictor)
Evictor 接口定义了两个方法:
⚫ evictBefore():定义执行窗口函数之前的移除数据操作
⚫ evictAfter():定义执行窗口函数之后的以处数据操作
默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的
窗口的销毁
一般情况下,当时间达到了结束点,就会直接触发计算输出结果、进而清除状态销毁窗口。这时窗口的销毁可以认为和触发计算是同一时刻。这里需要注意,Flink 中只对时间窗口(TimeWindow)有销毁机制;由于计数窗口(CountWindow)是基于全局窗口(GlobalWindw)实现的,而全局窗口不会清除状态,所以就不会被销毁。
ProcessFunction
内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()
八类处理函数:
(1)ProcessFunction
最基本的处理函数,基于 DataStream 直接调用.process()时作为参数传入。
(2)KeyedProcessFunction
对流按键分区后的处理函数,基于 KeyedStream 调用.process()时作为参数传入。要想使用
定时器,比如基于 KeyedStream。
(3)ProcessWindowFunction
开窗之后的处理函数,也是全窗口函数的代表。基于 WindowedStream 调用.process()时作
为参数传入。
(4)ProcessAllWindowFunction
同样是开窗之后的处理函数,基于 AllWindowedStream 调用.process()时作为参数传入。
(5)CoProcessFunction
合并(connect)两条流之后的处理函数,基于 ConnectedStreams 调用.process()时作为参数传入。
(6)ProcessJoinFunction
间隔连接(interval join)两条流之后的处理函数,基于 IntervalJoined 调用.process()时作为参数传入。
(7)BroadcastProcessFunction
广播连接流处理函数,基于 BroadcastConnectedStream 调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未 keyBy 的普通 DataStream 与一个广播流(BroadcastStream)做连接(conncet)之后的产物。
(8)KeyedBroadcastProcessFunction
按键分区的广播连接流处理函数,同样是基于 BroadcastConnectedStream 调用.process()时作为参数传入。与 BroadcastProcessFunction 不同的是,这时的广播连接流,是一个 KeyedStream与广播流(BroadcastStream)做连接之后的产物。
多流转换
分流
使用侧输出流、定义多个侧输出流。process处理时加到对应的侧输出流中
合流
Union
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union);联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。不限制流的个数。
合流的时候数据顺序?
Connect
流的联合虽然简单,不过受限于数据类型不能改变。connect限制只能2条流。合并后返回的是ConnectedStreams,相应的处理方法也是CoMapFunction之类的。
基于时间的合流
Windows Join
stream1.join(stream2).where(<KeySelector>).equalTo(<KeySelector>).window(<WindowAssigner>).apply(<JoinFunction>)
window join类似于 inner join。也就是说,最后处理输出的,只有两条流中数据按 key 配对成功的那些
Interval Join
间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配,匹配的规则也是key相同
stream1.keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1)).process (new ProcessJoinFunction<Integer, Integer, String(){@Override
public void processElement(Integer left, Integer right, Context ctx,
Collector<String> out) {
out.collect(left + "," + right);}});
Windows CoGroup
用法跟 window join 非常类似,也是将两条流合并之后开窗处理匹配的元素,调用时只需要将.join()换为.coGroup()
public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception; }
coGroup 操作比窗口的 join 更加通用,不仅可以实现类似 SQL 中的“内连接”(inner join),也可以实现左外连接(left outer join)、右外连接(right outer join)和全外连接(full outer join)
状态编程
一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以 Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。而很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态。
算子状态作用于当前并行子任务、按键分区状态作用于输入流的key级别。所以按键分区状态只能在keyby后使用
按键分区状态(Keyed State)
1. 值状态(ValueState)
2. 列表状态(ListState)
3. 映射状态(MapState)
4. 归约状态(ReducingState)
5. 聚合状态(AggregatingState)
状态TTL
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
依次:设定的状态生存时间、设置更新类型、设置状态的可见性。(所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能基于存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的 NeverReturnExpired 是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是 ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值)
目前的 TTL 设置只支持处理时间。另外,所有集合类型的状态(例如ListState、MapState)在设置 TTL 时,都是针对每一项(per-entry)元素的。也就是说,一个列表状态中的每一个元素,都会以自己的失效时间来进行清理,而不是整个列表一起清理。
算子状态(Operator State)
作用于并行子任务
1. 列表状态(ListState)
每个并行子任务维护一个列表状态,在当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的 rebanlance 数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-splitredistribution)。
2. 联合列表状态(UnionListState)
与列表状态类似,仅仅是进行缩放调整时对于状态的分配方式不同。
3. 广播状态(BroadcastState)
有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。
状态持久化和状态后端
Flink 会定期保存检查点,在检查点中会记录每个算子的 id 和状态;如果发生故障,Flink 就会用最近一次成功保存的检查点来恢复应用的状态,重新启动处理流程,就如同“读档”一样。
状态后端的分类:
1. 哈希表状态后端
即放内存,具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap)
2. RocksDB
RocksDB 是一种内嵌的 key-value 存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend 后,会将处理中的数据全部放入 RocksDB 数据库中,RocksDB默认存储在 TaskManager 的本地数据目录里。与 HashMapStateBackend 直接在堆内存中存储对象不同,这种方式下状态主要是放在RocksDB 中的。数据被存储为序列化的字节数组(Byte Arrays),读写操作需要序列化/反序列化,因此状态的访问性能要差一些。
为每个Job配置状态后端:env.setStateBackend(new HashMapStateBackend());
指定checkpoint的目录:state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
3.FsStateBackend
检查点
所有任务状态在同一时间点的一个“快照”(snapshot),它的触发是周期性的。当所有任务都恰好处理完一个相同的输入数据的时候,将它们的状态保存下来。首先,这样避免了除状态之外其他额外信息的存储,提高了检查点保存的效率。其次,一个数据要么就是被所有任务完整地处理完,状态得到了保存;要么就是没处理完,状态全部没保存:这就相当于构建了一个“事务”(transaction)。
barrier概念
barrier注入数据流中,根据数据流一起流转。每个barrier携带一个快照ID。来自不同快照的多个障碍可以同时在流中,这意味着各种快照可能同时发生。一旦Sink操作符(流式 DAG 的末端)从其所有输入流中接收到屏障 n,它就会向检查点协调器确认快照 n。 在所有Sink都确认快照后,则完成了此次快照。
网友评论