1.Event:流处理中处理的数据代表的实际业务事件。
2.Time:确定事件的时间概念,用于flink基于时间的操作(window等)。
三种时间语义- Processing Time
Processing Time 是指事件被操作算子处理时机器的系统时间。不需要流和机器之间的协调,它提供了最好的性能和最低的延迟。但是,在分布式和异步的环境下,Processing Time 不能提供确定性,因为它容易受到事件到达系统的速度(例如从消息队列)、事件在系统内操作流动的速度以及中断的影响。 - Event Time
Event Time 是事件发生的时间。事件(数据)自带的时间字段,与flink无关,需要程序指定如何从数据中构造time水印。此方式处理事件可以完美得到想要的结果,但是无序到达事件会导致程序延时处理,且不能无限等待迟到数据会导致结果一致性问题。正常业务需求关注的都是event time,event time会存在迟到数据的情况,需要引入Watermark迟到数据处理等概念。 - Ingestion Time
Ingestion Time 是事件进入Flink Source的时间。在源操作处,每个事件将源的当前时间作为时间戳,其介于event 和 processing之间。
1.12之后默认使用event time。
3.Watermark(水印)
Watermark是一种特殊的时间戳,也是一种被插入到数据流的特殊的数据(StreamElement),用于表示延迟多久计算当前窗口的聚合操作。可以防止Event乱序到达flink时,计算无限等待迟到的数据。
延迟处理示例:
- [1s,5s)的window窗口,设置延迟时间为为3s,则8s的数据到达后会在stream中插入watermark5,就会触发[1s,5s)的window窗口计算;
- 如果还存再延迟数据,则使用allowedLateness(),每次迟到数据来了之后重新计算并输出;
- 如果还存在,则sideout输出后再进行后续处理。
插入的watremark为单调递增的,与数据时间戳相关。
数据到达后分配到对应的窗口,watermark到达后触发对应的窗口计算。
watermark的传递:
watermark在重分区操作后(keyby,rebanlance等)会广播到全部下游分区。
下游分区收到上游多个分区watermark,会存储每个上游分区的watermark,以最小的watermark为准进行操作,如果最小watermark更新,则向后续下游广播。
3.1.watermark使用示例
处理每条数据时都使用到目前为止event time最大值,设置最大等待时间为3s,水印则为大当前最大event time-3s,window(10s)会等待3s触发上一个窗口计算:(旧版本方式)
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3000; // 3.0 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
// 以迄今为止收到的最大时间戳来生成 watermark
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
event time | max event time | watermark | 操作 |
---|---|---|---|
10000 | 10000 | 7000 | 水印为当前最大值-3s |
11000 | 11000 | 8000 | |
12000 | 12000 | 9000 | |
18000 | 18000 | 15000 | |
13000 | 18000 | 15000 | 最大值和水印不变 |
19000 | 19000 | 16000 | |
20000 | 20000 | 17000 | |
23000 | 23000 | 20000 | 触发窗口[10,20)计算 |
25000 | 25000 | 22000 | 不会再触发窗口[10,20)计算 |
当23s的数据到达,生成的watermark时间戳为20s,触发计算10~20s窗口,但是只会计算[10,20)之间的数据,并不会把触发计算的下一窗口数据计算进去。达到等待迟到数据3s的效果,已经触发过计算的窗口不会再次被触发。
(注:不是对eventTime要求特别严格的数据,尽量不要采用eventTime方式来处理,会有丢数据的风险。)
在新版本1.12-中,使用WatermarkStrategy工厂方式构造构造watermark和time语义
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>
TimestampAssignerSupplier负责定义时间语义
WatermarkGeneratorSupplier负责设置watermark
watermark类型:
间断性:每一条数据后插入一条watermark,数据稀疏
周期性:每隔一段时间生成一条watermark,数据稠密
watermark成生器父类如下:
//基于event或者周期生成watermark,代替了旧版本中的:
//AssignerWithPunctuatedWatermarks :
//AssignerWithPeriodicWatermarks:周期性
public interface WatermarkGenerator<T> {
//每条数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
//周期性调用,由ExecutionConfig.setAutoWatermarkInterval()方法指定周期时间间隔。
void onPeriodicEmit(WatermarkOutput output);
}
常用的watermark生成器:
BoundedOutOfOrdernessWatermarks(周期性)
//
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
//到目前为止最大的event time
private long maxTimestamp;
//watermark最大无序延迟时间
private final long outOfOrdernessMillis;
//通过最大无序时间界限生成watermark生成器
public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
}
//每来一条数据更新当前最大的event time
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
}
//周期性生成watermark到数据流中
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}
}
3.2.延时到达的数据处理
由于设置watermark会等待延时数据到达的时间不能过长,当数据迟到过久到达并不会再次触发计算。则需要处理延时到达数据:
- 继续触发窗口计算:
通过allowedLateness()设置,只要满足watermark < window_end_time + allowedLateness,延迟数据进入窗口就会触发窗口计算。即当前到达的数据
的最大时间-设置最大等待时间生成的水印,没有超过当前窗口右侧值allowedLateness范围是,会重新触发计算。 - 重定向到其他地方:
ds.getSideOutput()
网友评论