美文网首页Flink
Flink自定义trigger解决单个窗口多次触发的问题

Flink自定义trigger解决单个窗口多次触发的问题

作者: 风筝flying | 来源:发表于2020-05-11 16:09 被阅读0次

问题描述

最近尝试通过Flink实时统计每5分钟的某个业务事件的发生次数,时间类型采用业务发生时间即EventTime,汇总计数后实时写入Clickhouse。但是当程序运行一段时间后,发现对于同一个key,在同一个时间窗口会有多条数据存在。

定位问题原因

通过查看源码,官方文档及众多网上资料,问题定位成功,定位流程如下:

  1. Flink的event-time模式,默认提供的window有TumblingEventTimeWindows,SlidingEventTimeWindows,EventTimeSessionWindow等,这几种window类型是属于window operator的一部分,称为window assigner。
  2. 对于window operator来讲,除了window assigner之外,还包括trigger,evictor和window process。
  3. window assigner指明流中的数据属于哪个window;trigger指明在哪些条件下触发window的计算;evictor在window的计算前或后,将window中的数据移除;window process在窗口被触发后,执行计算逻辑,flink默认提供有ReduceFunction,AggragateFunction,还可以自定义实现windowProcessFunction。
  4. 在上述的三个默认assigner中,flink都提供了默认的trigger,所以在我的第一版代码中,并没有写trigger仍能得出计算结果,这是因为这三个window中的getDefaultTrigger()方法使用的是EventTimeTrigger,也就是它给我们提供了默认的trigger。
  5. 查看EventTimeTrigger源码
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
        return TriggerResult.CONTINUE;
    }

    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

依据网上总结的窗口触发条件

  • watermark时间 >= window_end_time
  • 在[window_start_time,window_end_time)中有数据存在

以及窗口的过期条件

watermark时间>=window_end_time+allowedLateness

综上,即可定位问题的产生原因是由于我在代码中指定了allowedLateness(30秒),所以在window_end_time<=watermark时间<window_end_time+allowedLateness的时间范围内,只要有迟到的数据,就会再次触发窗口计算。

解决办法

根据问题的产生原因,能够想到的解决办法就是不使用默认的trigger,改为自定义trigger,实现每个窗口再过期前仅触发一次。
ps:通过控制窗口的state应该也可以解决该问题,但作为flink新手,暂时还无法把想法变成代码...
先贴出自定义trigger的代码,然后再细讲。

//allowedLateness:30000
new Trigger<Tuple5<Integer, String, String, Integer, Long>, TimeWindow>() {
        @Override
        public TriggerResult onElement(Tuple5<Integer, String, String, Integer, Long> integerStringStringIntegerLongTuple5, long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
        
            triggerContext.registerEventTimeTimer(timeWindow.maxTimestamp()+30000L);
            return TriggerResult.CONTINUE;
        }
        @Override
        public TriggerResult onProcessingTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            return TriggerResult.CONTINUE;
        }
        
        @Override
        public TriggerResult onEventTime(long l, TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            //return l==(timeWindow.maxTimestamp()+30000L-1L)?TriggerResult.FIRE:TriggerResult.CONTINUE;
            return TriggerResult.FIRE;
        }
        
        @Override
        public void clear(TimeWindow timeWindow, TriggerContext triggerContext) throws Exception {
            triggerContext.deleteEventTimeTimer(timeWindow.maxTimestamp()+30000L);

onElement方法:当每条数据到达时都会触发,再这里修改为只注册一个eventtime的定时时间(所属窗口的结束时间+allowedLateness)
onProcessingTime方法:这个是当process time到达定时时间时执行,这里不做修改
onEventTime方法:重点讲解下该方法,这里贴下源码作对比

public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        return TriggerResult.FIRE;
    } else {
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
}
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
    return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
}

源码中在onElement方法中用window.maxTimestamp()注册timer,当注册的时间达到时就会触发相应的窗口动作。现在问题来了,在onElement方法中注册用的是window.maxTimestamp(),在onEventTime方法中time == window.maxTimestamp()的判断条件岂不是永远为true?那么什么情况下会返回false呢?经查找,在window operator的process源码中查到如下代码:

protected void registerCleanupTimer(W window) {
  long cleanupTime = cleanupTime(window);
  if (cleanupTime == Long.MAX_VALUE) {
    // don't set a GC timer for "end of time"
    return;
  }

  if (windowAssigner.isEventTime()) {
    triggerContext.registerEventTimeTimer(cleanupTime);
  } else {
    triggerContext.registerProcessingTimeTimer(cleanupTime);
  }

这里面注册了一个清理的timer,这个时间是cleanupTime返回的

private long cleanupTime(W window) {
  if (windowAssigner.isEventTime()) {
    long cleanupTime = window.maxTimestamp() + allowedLateness;
    return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
  } else {
    return window.maxTimestamp();
  }
}

按照源码的逻辑,当时间到达window.maxTimestamp() + allowedLateness时,也会触发onEventTime方法。总结一下,按照源码逻辑,触发onEventTime的timer有两种,一种就是在onElement注册的timer,用于触发计算,一定会返回TriggerResult.FIRE;另一种是清理窗口的timer,如果配置了allowedLateness大于零,那么返回就是TriggerResult.CONTINUE。

所以我们自定义trigger实现所需功能的逻辑基础也在于此,我们在onElement中注册一个与cleanupTime相同的timer,然后将onEventTime方法改为返回TriggerResult.FIRE,这样就只有onEventTime方法会触发窗口的计算,而只有一个时间即cleanupTime会触发onEventTime方法,这样就实现了一个窗口只会被触发一次,即在窗口过期时被触发。

ps:如文中所提,flink新人一枚,欢迎指正此方法是否存在问题。如有问题,或者在并行度>1的情况(本文的并行度=1)存在问题,欢迎指正,谢谢!

相关文章

网友评论

    本文标题:Flink自定义trigger解决单个窗口多次触发的问题

    本文链接:https://www.haomeiwen.com/subject/svpunhtx.html