美文网首页数客联盟
EventTimeTrigger中onElement方法分析

EventTimeTrigger中onElement方法分析

作者: Woople | 来源:发表于2020-03-03 13:21 被阅读0次

    疑问

    onElement方法的具体实现如下

    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
      if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // if the watermark is already past the window fire immediately
        return TriggerResult.FIRE;
      } else {
        ctx.registerEventTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
      }
    }
    

    下面是官方文档对于这个方法的解释

    The onElement() method is called for each element that is added to a window.

    也就是说每条数据加入这个窗口中都会调用一次这个方法,什么情况下这个方法会返回TriggerResult.FIRE

    分析

    关键问题在于WindowOperator#isWindowLate方法

    protected boolean isWindowLate(W window) {
      return (windowAssigner.isEventTime() 
          && (cleanupTime(window) <= internalTimerService.currentWatermark()));
    }
    

    如果allowedLateness没有设置默认为0,那么第二个判断条件相当于window.maxTimestamp()<=internalTimerService.currentWatermark()如果这个条件为true,那么上层WindowOperator#processElement方法中

    // drop if the window is already late
    if (isWindowLate(window)) {
      continue;
    }
    

    会走到这个判断里面,那么就不会调用到EventTimeTrigger#onElement,反过来说,能调用到EventTimeTrigger#onElement方法的情况,window.maxTimestamp() <= ctx.getCurrentWatermark()就不会成立。

    但是当设置allowedLateness大于0的情况,数据迟到的条件变成了window.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark(),那么假设滚动窗口的size是30秒,设置allowedLateness为10秒这时候来了一条数据的时间戳为1573441910000,那么此时window.maxTimestamp()=1573441919999,allowedLateness=10000,internalTimerService.currentWatermark()=1573441924000,不满足上面迟到的条件,进入EventTimeTrigger#onElement,这时就满足了window.maxTimestamp() <= ctx.getCurrentWatermark(),即返回值就是TriggerResult.FIRE

    小结

    上面的过程比较绕,简单的说,如果allowedLateness=0那么进入EventTimeTrigger#onElement后不可能返回TriggerResult.FIRE,因为满足这个判断条件的数据在前面isWindowLate(window)判断中已经过滤掉了。如果allowedLateness>0那么满足迟到的数据进入EventTimeTrigger#onElement后就会返回TriggerResult.FIRE。有兴趣的读者可以运行demo进行测试。

    总结

    结合之前的文章EventTimeTrigger中onEventTime方法分析可以得到下面的结论

    • 当没有设置allowedLateness的时候,即allowedLateness=0

      1. EventTimeTrigger#onElement用来注册窗口触发的定时器
      2. 定时器触发之后回调EventTimeTrigger#onEventTime触发窗口的计算
    • 设置allowedLateness>0

      1. 没有迟到的数据调用逻辑如上
      2. 窗口已经触发计算之后,在允许迟到时间范围内到来的数据,会在EventTimeTrigger#onElement中返回TriggerResult.FIRE触发计算,每一条都会触发一次所在窗口的计算
      3. 迟到的数据不会在EventTimeTrigger#onEventTime触发计算,此时对于迟到的数据返回TriggerResult.CONTINUE

    相关文章

      网友评论

        本文标题:EventTimeTrigger中onElement方法分析

        本文链接:https://www.haomeiwen.com/subject/felqlhtx.html