window

作者: ZYvette | 来源:发表于2021-05-07 20:43 被阅读0次

    window 种类

    • keyed Windows
    stream
           .keyBy(...)               <-  keyed versus non-keyed windows
           .window(...)              <-  required: "assigner"
          [.trigger(...)]            <-  optional: "trigger" (else default trigger)
          [.evictor(...)]            <-  optional: "evictor" (else no evictor)
          [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
          [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
           .reduce/aggregate/fold/apply()      <-  required: "function"
          [.getSideOutput(...)]      <-  optional: "output tag"
    
    
    • Non-Keyed Windows:也就是global window
    stream
           .windowAll(...)           <-  required: "assigner"
          [.trigger(...)]            <-  optional: "trigger" (else default trigger)
          [.evictor(...)]            <-  optional: "evictor" (else no evictor)
          [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
          [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
           .reduce/aggregate/fold/apply()      <-  required: "function"
          [.getSideOutput(...)]      <-  optional: "output tag"
    

    问题总结

    • 窗口计算是如何触发的
    • watermark的作用
    • 延迟数据如何计算

    源码分析

    windowAssinger: 窗口分配类型

    • window(<windowAssigner>)
    image.png

    window 方法接收的输入是一个 WindowAssigner, WindowAssigner 负责将每条输入的数据分发到正确的 window 中(一条数据可能同时分发到多个 Window 中),Flink提供了几种通用的 WindowAssigner:
    tumbling window(窗口间的元素无重复),
    sliding window(窗口间的元素可能重复),
    session window 以及
    global window.
    如果需要自己定制数据分发策略,则可以实现一个 class,继承自WindowAssigner。

    evictor:自定义用户代码前后操作

      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
    

    evictor 主要用于做一些数据的自定义操作,可以在执行用户代码之前,也可以在执行用户代码之后,默认是在用户代码之前执行。
    更详细的描述可以参考org.apache.flink.streaming.api.windowing.evictors.Evictor 的 evicBefore 和 evicAfter
    两个方法。Flink 提供了如下三种通用的 evictor:

    类别 种类
    CountEvictor 保留指定数量的元素
    DeltaEvictor 通过执行用户给定的 DeltaFunction 以及预设的 theshold,判断是否删除一个元素。
    TimeEvictor 设定一个阈值 interval,删除所有不再 max_ts - interval 范围内的元素,其中 max_ts 是窗口内时间戳的最大值。

    evictor 是可选的方法,如果用户不选择,则默认没有。


    image.png

    trigger:触发条件

      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
    
    image.png

    trigger 用来判断一个窗口是否需要被触发,每个 WindowAssigner 都自带一个默认的trigger,默认使用的是对应EventTimeTrigger、ProcessingTimeTrigger等等。


    image.png

    如果默认的 trigger 不能满足你的需求,则可以自定义一个类,继承自Trigger 即可,我们详细描述下 Trigger 的接口以及含义:

    • onElement() 每次往 window 增加一个元素的时候都会触发
    • onEventTime() 当 event-time timer 被触发的时候会调用
    • onProcessingTime() 当 processing-time timer 被触发的时候会调用
    • onMerge() 对两个 trigger 的 state 进行 merge 操作
    • clear() window 销毁的时候被调用

    上面的接口中前三个会返回一个 TriggerResult,TriggerResult 有如下几种可能的选择:

    allowedLateness:允许延迟时间

      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
    

    Time & watermark

    Event Time、Ingestion Time、Processing Time

    Event-Time 表示事件发生的时间,Processing-Time 则表示处理消息的时间(墙上时间),Ingestion-Time 表示进入到系统的时间。

    在 Flink 中我们可以通过下面的方式进行 Time 类型的设置

    // 设置使用ProcessingTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); 
    

    watermark


    image.png
    • watermark是指定时生成一个时间戳,用于标杆当前数据是否延迟,任务watermark的时间点任务之后的数据都是晚到的。
    • 因为数据会有延迟,watermark不能完全解决延迟问题,所以实际中可以设置允许延迟,并触发延迟数据处理。
    • watermark是用于处理EventTime的数据。

    代码详解

    apply方法:

    private <R> WindowOperator<K, T, ?, R, W> apply(
                InternalWindowFunction<Iterable<T>, R, K, W> function) {
            if (evictor != null) {
                return buildEvictingWindowOperator(function);
            } else {
                ListStateDescriptor<T> stateDesc =
                        new ListStateDescriptor<>(
                                WINDOW_STATE_NAME, inputType.createSerializer(config));
                return buildWindowOperator(stateDesc, function);
            }
        }
    

    session window原理

    http://wuchong.me/blog/2016/06/06/flink-internals-session-window/

    问题答案:

    1. 窗口什么时候出发?
      在符合trigger条件时触发。
      具体的看trigger部分
      例如:eventtime
      a.正常是窗口<=watermark是出发
      b.当配置允许数据延迟时,会在窗口<=watermark但是窗口+延迟时间<=watermark之前,没到一条数据会触发一次,之后将销毁。

    2.延迟数据如何处理?
    正常在窗口触发之后,窗口会被销毁,当允许数据延迟之后,窗口不会马上销毁,所以延迟的数据会被分配到该窗口,重新触发。

    1. 窗口如何分配?

    sessionwindow:http://wuchong.me/blog/2016/06/06/flink-internals-session-window/
    每条数据【start,start+gap】,求多数据的全集即是实际窗口大小。
    slidingwindow:每条数据对应多个窗口
    tumblingwindow:数据对应固定窗口

    相关文章

      网友评论

          本文标题:window

          本文链接:https://www.haomeiwen.com/subject/sjuzlltx.html