问题描述
最近尝试通过Flink实时统计每5分钟的某个业务事件的发生次数,时间类型采用业务发生时间即EventTime,汇总计数后实时写入Clickhouse。但是当程序运行一段时间后,发现对于同一个key,在同一个时间窗口会有多条数据存在。
定位问题原因
通过查看源码,官方文档及众多网上资料,问题定位成功,定位流程如下:
- Flink的event-time模式,默认提供的window有TumblingEventTimeWindows,SlidingEventTimeWindows,EventTimeSessionWindow等,这几种window类型是属于window operator的一部分,称为window assigner。
- 对于window operator来讲,除了window assigner之外,还包括trigger,evictor和window process。
- window assigner指明流中的数据属于哪个window;trigger指明在哪些条件下触发window的计算;evictor在window的计算前或后,将window中的数据移除;window process在窗口被触发后,执行计算逻辑,flink默认提供有ReduceFunction,AggragateFunction,还可以自定义实现windowProcessFunction。
- 在上述的三个默认assigner中,flink都提供了默认的trigger,所以在我的第一版代码中,并没有写trigger仍能得出计算结果,这是因为这三个window中的getDefaultTrigger()方法使用的是EventTimeTrigger,也就是它给我们提供了默认的trigger。
- 查看EventTimeTrigger源码
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
依据网上总结的窗口触发条件
- watermark时间 >= window_end_time
- 在[window_start_time,window_end_time)中有数据存在
以及窗口的过期条件
watermark时间>=window_end_time+allowedLateness
综上,即可定位问题的产生原因是由于我在代码中指定了allowedLateness(30秒),所以在window_end_time<=watermark时间<window_end_time+allowedLateness的时间范围内,只要有迟到的数据,就会再次触发窗口计算。
解决办法
根据问题的产生原因,能够想到的解决办法就是不使用默认的trigger,改为自定义trigger,实现每个窗口再过期前仅触发一次。
ps:通过控制窗口的state应该也可以解决该问题,但作为flink新手,暂时还无法把想法变成代码...
先贴出自定义trigger的代码,然后再细讲。
//allowedLateness:30000
new Trigger<Tuple5<Integer, String, String, Integer, Long>, TimeWindow>() {
@Override
public TriggerResult onElement(Tuple5<Integer, String, String, Integer, Long> integerStringStringIntegerLongTuple5, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp()+30000L);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
//return l==(timeWindow.maxTimestamp()+30000L-1L)?TriggerResult.FIRE:TriggerResult.CONTINUE;
return TriggerResult.FIRE;
}
@Override
public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
triggerContext.deleteEventTimeTimer(timeWindow.maxTimestamp()+30000L);
onElement方法:当每条数据到达时都会触发,再这里修改为只注册一个eventtime的定时时间(所属窗口的结束时间+allowedLateness)
onProcessingTime方法:这个是当process time到达定时时间时执行,这里不做修改
onEventTime方法:重点讲解下该方法,这里贴下源码作对比
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}
源码中在onElement方法中用window.maxTimestamp()注册timer,当注册的时间达到时就会触发相应的窗口动作。现在问题来了,在onElement方法中注册用的是window.maxTimestamp(),在onEventTime方法中time == window.maxTimestamp()的判断条件岂不是永远为true?那么什么情况下会返回false呢?经查找,在window operator的process源码中查到如下代码:
protected void registerCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
if (cleanupTime == Long.MAX_VALUE) {
// don't set a GC timer for "end of time"
return;
}
if (windowAssigner.isEventTime()) {
triggerContext.registerEventTimeTimer(cleanupTime);
} else {
triggerContext.registerProcessingTimeTimer(cleanupTime);
}
这里面注册了一个清理的timer,这个时间是cleanupTime返回的
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
long cleanupTime = window.maxTimestamp() + allowedLateness;
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return window.maxTimestamp();
}
}
按照源码的逻辑,当时间到达window.maxTimestamp() + allowedLateness时,也会触发onEventTime方法。总结一下,按照源码逻辑,触发onEventTime的timer有两种,一种就是在onElement注册的timer,用于触发计算,一定会返回TriggerResult.FIRE;另一种是清理窗口的timer,如果配置了allowedLateness大于零,那么返回就是TriggerResult.CONTINUE。
所以我们自定义trigger实现所需功能的逻辑基础也在于此,我们在onElement中注册一个与cleanupTime相同的timer,然后将onEventTime方法改为返回TriggerResult.FIRE,这样就只有onEventTime方法会触发窗口的计算,而只有一个时间即cleanupTime会触发onEventTime方法,这样就实现了一个窗口只会被触发一次,即在窗口过期时被触发。
ps:如文中所提,flink新人一枚,欢迎指正此方法是否存在问题。如有问题,或者在并行度>1的情况(本文的并行度=1)存在问题,欢迎指正,谢谢!
网友评论