Flink四大基石
window窗口 Time时间 Status状态 Checkpoint检查点
1.窗口Window和Time时间
image.pngFlinkwindow窗口,相当于把流式数据转换成一批次一批次,批处理
分类:
- timewindow时间窗口
根据时间划分窗口 - countwindow数量窗口
根据数量划分窗口 - sessionwindow会话窗口
根据会话间隔划分窗口
窗口根据窗口大小和窗口间隔分为滚动窗口,滑动窗口
- 滚动窗口 窗口大小=窗口间隔
- 滑动窗口 窗口大小>窗口间隔
窗口根据数据流分为
分组流窗口(常用)
数据流窗口
Time时间:
Flink 时间语义
1.11版本之前:
-
事件时间EventTime,指的就是数据产生的数据,比如订单数据中订单时间
-
摄入时间InjectionTime,指的就是数据被Data Source获取到时间
-
处理时间ProcessingTime,指的就是数据被转换处理的时间
1.11版本开始:
事件时间EventTime 和 处理时间ProcessingTime
2.窗口计算
Flink流式计算当中最重要的就是 基于事件时间的时间窗口计算
EventTime事件时间的滚动窗口 ★★★★★
EventTime事件时间的滑动窗口 ★★★★
EventTime事件时间的会话窗口 ★★★★
- 如果对窗口数据计算时,需要对窗口数据进行排序操作,肯定使用全局窗口数据计算
- 普通窗口数据计算,通常属于窗口增量计算,调用算子:reduce、aggregate
- 优化:先增后全方式窗口数据计算
3.基于事件时间窗口计算
- step1.必须指定数据流中,事件时间的字段,且必须是long类型
- step2.设置窗口聚合函数
[对DataStream进行窗口计算时,通常情况下,都是先对数据流进行分组,再对分组流进行窗口计算】
datastream.keyBy.window.apply/process/reduce/aggregate
全量 apply process)(增量 reduce aggregate)
4.基于时间事件的窗口计算时,如何处理乱序,延迟和迟到数据
【一个原则:基于事件时间窗口计算,不要让数据丢失,可以单独侧边流输出,进行处理分析】
下述方案,针对滚动窗口和滑动窗口,与会话窗口没有关系
默认情况:触发窗口计算后,窗口销毁
方案一 Watermark水印
【时间方式,处理乱序数据】
- 可以设置窗口数据触发计算时,等一等时间,再触发计算
- 相当于给每条数据的事件时间加一个时间戳,
列如窗口大小为5m,设置 Watermark为2m
9.01-9.05 但是9.06也会被计算,
9.06这条数据相当于 事件时间- Watermark水印时间
会触发一次窗口计算.
方案二 Allowed Lateness 允许延迟
【空间方式,内存保存窗口计算结果,处理延迟数据】
- 把窗口计算的结果,再在内存中保存一段时间,延迟数据到达时会再次触发窗口计算. Allowed Lateness不能设置时间太大,会造成性能浪费
- 当窗口数据触发计算以后,如果有窗口数据达到,依然参与计算
方案三 Side Output 侧边输出
兜底操作,将以上两种方案没有处理后的乱序,延迟和迟到数据,作为侧边流输出,单独处理
5.乱序数据、延迟数据、迟到数据,如何界定?
- 乱序数据
基于事件时间窗口计算时
没有设置Watermark机制,数据不会被窗口计算;
如果设置Watermark以后,数据将会被窗口计算;
【此时这条数据就是乱序数据,迟到时间很短很短】
- 乱序数据
- 延迟数据
基于事件时间窗口计算时
数据到达时,窗口已经计算,如果会再次触发窗口计算,这条数据就是延迟数据
AllowedLateness
- 延迟数据
- 迟到数据
基于事件时间窗口计算时
数据到达时,窗口已经计算,并且销毁数据,此条数据就是迟到数据
- 迟到数据
6.基于事件时间的会话窗口对乱序、延迟和迟到数据的处理(重要)
Flink Window窗口计算中,如果是基于事件时间的会话窗口,不存在乱序、延迟和迟到数据的处理。
数据到达时,如果窗口触发计算并且销毁,直接属于下一个窗口中数据,参与计算
Flink1.11版本使用
Flink 1.11之前版本,基于事假时间窗口计算时,需要显示设置(默认时间语义为处理时间ProcessTime)时间语义,并且从1.12版本开始,基于时间窗口计算API也发生变化,与以前版本稍有不同
- step1 显示设置时间语义
// 1. 执行环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//todo step1 显示设置时间时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
- step2 指定数据的事件时间字段,且必须是long类型
crossRoadEventStream
.assignTimestampsAndWatermarks(
// 设置最大乱序时间maxOutOfOrderness 为1min
new BoundedOutOfOrdernessTimestampExtractor<CrossRoadEvent>(Time.minutes(1)) {
- step3 设置窗口和聚合函数
- 设置watermark水印
//todo step2 指定数据中事件时间的字段,必须为long类型
SingleOutputStreamOperator<CrossRoadEvent> timeStream = crossRoadEventStream
.assignTimestampsAndWatermarks(
// 设置最大乱序时间maxOutOfOrderness 为1min
new BoundedOutOfOrdernessTimestampExtractor<CrossRoadEvent>(Time.minutes(1)) {
- 设置侧边流和 Allowed Lateness
// 设置滚动窗口TumblingEventTimeWindows大小为10min 并且设置窗口聚合函数
SingleOutputStreamOperator<CrossRoadReport> reportStream = timeStream
.keyBy(CrossRoadEvent::getRoadId)
.window(TumblingEventTimeWindows.of(Time.minutes(10)))
//设置侧边流
.sideOutputLateData(lateOutputTag)
// 设置allowedLateness 当窗口计算后,保存一段时间数据,当窗口中有数据达到时,继续触发计算
.allowedLateness(Time.minutes(2))
网友评论