美文网首页
Flink 窗口

Flink 窗口

作者: 专职掏大粪 | 来源:发表于2020-05-05 12:05 被阅读0次

    window的使用场景

    聚合统计、数据合并(积攒批)、双流join

    Window Assigner

    Window Assigner:WindowAssigner 负责将每条输入的数据分发到正确的 window 中(一条数据可能同时分发到多个 Window 中)

    Evictor

    Evictor:可以译为“驱逐者”。在Trigger触发之后,可以指定在窗口前后执行,相当于一个filter.Evictor 是可选的方法,如果用户不选择,则默认没有。

    EvictorevicBefore 和 evicAfter两个方法用于分别在窗口前后过滤

    Trigger

    Trigger:触发器。

    trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的 trigger,如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自 Trigger 即可,我们详细描述下 Trigger 的接口以及含义:

    * onElement() 每次往 window 增加一个元素的时候都会触发

    * onEventTime() 当 event-time timer 被触发的时候会调用

    * onProcessingTime() 当 processing-time timer 被触发的时候会调用

    * onMerge() 对两个 trigger 的 state 进行 merge 操作

    * clear()  window 销毁的时候被调用

    上面的接口中前三个会返回一个 TriggerResult,TriggerResult 有如下几种可能的选择:

    * CONTINUE 不做任何事情

    * FIRE 触发 window

    * PURGE 清空整个 window 的元素并销毁窗口

    * FIRE_AND_PURGE 触发窗口,然后销毁窗口

    Trigger的场景使用

    场景1:ContinuousEventTimeTrigger

    短窗口的计算由于其窗口期较短,那么很快就能获取到结果,但是对于长窗口来说窗口时间比较长,如果等窗口期结束才能看到结果,那么这份数据就不具备实时性,大多数情况我们希望能够看到一个长窗口的结果不断变动的情况,对此Flink提供了ContinuousEventTimeTrigger连续事件时间触发器与ContinuousProcessingTimeTrigger连续处理时间触发器,指定一个固定时间间隔interval,不需要等到窗口结束才能获取结果,能够在固定的interval获取到窗口的中间结果。

    场景2:ContinuousEventTimeTrigger+PurgingTrigger

    可以清除掉之前已经计算过的窗口数据,让window的间隔interval独立输出,最终在外部数据源(例如mysql)对一个窗口的数据汇总,方便使用




    window Function

    1.AggregateFunction 增量聚合(来一条聚合一条),减少state大小

    2.AggregateFunction 数据先存在state中,等到trigger 的时候,才通过state计算.可以输出key 、window等额外信息

    3.可以使用将AggregateFunction和AggregateFunction,一方面增量聚合,一方面补全信息


    window的触发

    window的触发要同时满足以下2个条件:

    watermark时间 >= window_end_time

    在[window_start_time,window_end_time)中有数据存在

    注意:watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加

    实现机制





    //如果新加入的timer触发时间早于下一次的触发时间,那么应该重新设置下一次触发时间,注意processingTimeTimersQueue 是一个小顶堆,时间早的在堆顶



    //注册下一次的触发任务

    定时器

    TimerService 在内部维护两种类型的定时器(处理时间和事件时间定时器)并排队执行。

    TimerService 会删除每个键和时间戳重复的定时器,即每个键在每个时间戳上最多有一个定时器。如果为同一时间戳注册了多个定时器,则只会调用一次onTimer()方法。

    Flink同步调用onTimer()和processElement()方法。因此,用户不必担心状态的并发修改。

    容错

    定时器具有容错能力,并且与应用程序的状态一起进行快照。如果故障恢复或从保存点启动应用程序,就会恢复定时器。

    在故障恢复之前应该触发的处理时间定时器会被立即触发。当应用程序从故障中恢复或从保存点启动时,可能会发生这种情况。

    定时器合并

    由于 Flink 仅为每个键和时间戳维护一个定时器,因此可以通过降低定时器的频率来进行合并以减少定时器的数量。

    对于频率为1秒的定时器(事件时间或处理时间),我们可以将目标时间向下舍入为整秒数。定时器最多提前1秒触发,但不会迟于我们的要求,精确到毫秒。因此,每个键每秒最多有一个定时器。

    Java版本:

    long coalescedTime=((ctx.timestamp()+timeout)/1000)*1000;ctx.timerService().registerProcessingTimeTimer(coalescedTime);

    Scala版本:

    val coalescedTime=((ctx.timestamp+timeout)/1000)*1000ctx.timerService.registerProcessingTimeTimer(coalescedTime)

    由于事件时间定时器仅当 Watermark 到达时才会触发,因此我们可以将当前 Watermark 与下一个 Watermark 的定时器一起调度和合并:

    Java版本:

    long coalescedTime=ctx.timerService().currentWatermark()+1;ctx.timerService().registerEventTimeTimer(coalescedTime);

    Scala版本:

    val coalescedTime=ctx.timerService.currentWatermark+1ctx.timerService.registerEventTimeTimer(coalescedTime)

    可以使用一下方式停止一个处理时间定时器:

    Java版本:

    long timestampOfTimerToStop=...ctx.timerService().deleteProcessingTimeTimer(timestampOfTimerToStop);

    Scala版本:

    val timestampOfTimerToStop=...ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)

    可以使用一下方式停止一个事件时间定时器:

    Java版本:

    long timestampOfTimerToStop=...ctx.timerService().deleteEventTimeTimer(timestampOfTimerToStop);

    Scala版本:

    val timestampOfTimerToStop=...ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)

    如果没有给指定时间戳注册定时器,那么停止定时器没有任何效果。

    Flink版本:1.8

    参考

    https://ververica.cn/developers/time-window/

    https://blog.csdn.net/qq_31866793/article/details/103138715

    https://blog.jrwang.me/2019/flink-source-code-time-and-window/

    https://www.jiangyuesong.me/2019/12/14/flink-window-and-timer/

    https://www.jianshu.com/p/753e8cf803bb

    相关文章

      网友评论

          本文标题:Flink 窗口

          本文链接:https://www.haomeiwen.com/subject/uqqmghtx.html