Flink Streaming Windows操作

作者: 尼小摩 | 来源:发表于2018-10-25 20:12 被阅读564次

    Window是无限数据流处理的核心,Window将一个无限的stream拆分成有限大小的”buckets”桶,我们可以在这些桶上做计算操作。本文主要聚焦于在Flink中如何进行窗口操作,以及程序员如何从window提供的功能中获得最大的收益。

    下面介绍了一个窗口化的Flink程序的总体结构。,第一个代码段中是分组的流,而第二段是非分组的流。正如我们所见,唯一的区别是分组的stream调用keyBy(…)window(…),而非分组的stream中window()换成了windowAll(…),这些也将贯穿都这一页的其他部分中。

    Keyed Windows

    stream
          .keyBy(...)               <-  keyed versus non-keyed windows
          .window(...)              <-  required: "assigner"
         [.trigger(...)]            <-  optional: "trigger" (else default trigger)
         [.evictor(...)]            <-  optional: "evictor" (else no evictor)
         [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
         [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
          .reduce/aggregate/fold/apply()      <-  required: "function"
         [.getSideOutput(...)]      <-  optional: "output tag"
    

    Non-Keyed Windows

    stream
           .windowAll(...)           <-  required: "assigner"
          [.trigger(...)]            <-  optional: "trigger" (else default trigger)
          [.evictor(...)]            <-  optional: "evictor" (else no evictor)
          [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
          [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
           .reduce/aggregate/fold/apply()      <-  required: "function"
          [.getSideOutput(...)]      <-  optional: "output tag"
    

    在上面的例子中,方括号[]内的命令是可选的, 这表明Flink允许您以许多不同的方式定制窗口逻辑,以便它最适合您的需求。

    Window生命周期

    简而言之,当第一个属于该窗口的元素到达时,就会创建一个窗口,并且当时间(事件或处理时间)超过其结束时间戳加上用户指定的允许迟到(请参阅允许迟到)时,窗口就会被完全删除。Flink确保只对基于时间的window进行删除,而不对其他类型的window进行删除,例如:global windows (see Window Assigners).例如,使用基于事件时间(event time)的窗口策略,每5分钟创建一个不重叠(或翻滚)窗口,允许延迟1分钟,Flink将会为12:00到12:05这段时间内,第一个包含时间戳的元素到达时创建一个新窗口,当水印通过12:06时,移除这个窗口。

    此外,每个 Window 都有一个Trigger(详情请看 Triggers)和一个附属于 Window 的函数(例如: WindowFunction, ReduceFunctionFoldFunction)(详情请看 Window Functions)函数里包含了应用于窗口(Window)内容的计算,而Trigger(触发器)则指定了函数在什么条件下可被应用(函数何时被触发),一个触发策略可以是 "当窗口中的元素个数超过4个时" 或者 "当水印达到窗口的边界时"。触发器还可以决定在窗口创建和删除之间的任意时刻清除窗口的内容,本例中的清除仅指清除窗口的内容而不是窗口的元数据,也就是说新的数据还是可以被添加到当前的window中。

    除此之外,您还可以指定一个 Evictor (详情 Evictors)将在触发器触发之后或者在函数被应用之前或者之后,清除窗口中的元素。

    在下面的文章中,我们将详细介绍上面的每个组件。我们从上述片段的主要部分开始 (see Keyed vs Non-Keyed Windows, Window Assigner, and Window Function)然后是可选部分。

    分组和非分组Windows (Keyed vs Non-Keyed Windows

    首先,第一件事是指定你的数据流是分组的还是未分组的,这个必须在定义 window 之前指定好。使用 keyBy(...) 会将你的无限数据流拆分成逻辑分组的数据流,如果 keyBy(...) 函数不被调用的话,你的数据流将不是分组的。

    对于keyed 分组数据流中,任何正在传入的事件的属性都可以被当做key(more details here
    分组数据流将你的window计算通过多任务并发执行,以为每一个逻辑分组流在执行中与其他的逻辑分组流是独立地进行的。

    在非分组数据流中,你的原始数据流并不会拆分成多个逻辑流并且所有的window逻辑将在一个任务中执行,并发度为1。

    窗口分配器 (Window Assigners)

    指定完你的数据流是分组的还是非分组的之后,接下来你需要定义一个窗口分配器(window assigner),窗口分配器定义了如何将元素分配给窗口。这是通过在分组数据流中调用window(…)或非分组数据流中调用windowAll()时你选择的窗口分配器(WindowAssigner)来指定的。WindowAssigner是负责将每一个到来的元素分配给一个或者多个窗口(window),Flink 提供了一些常用的预定义窗口分配器,即:滚动窗口、滑动窗口、会话窗口和全局窗口。 你也可以通过继承WindowAssigner类来自定义自己的窗口。所有的内置窗口分配器(除了全局窗口 global window)都是通过时间来分配元素到窗口中的,这个时间要么是处理的时间,要么是事件发生的时间。 请看一下我们的 event time 部分来了解更多处理时间和事件时间的区别及时间戳(timestamp)和水印(watermark)是如何产生的。

    基于时间的窗口有一个开始时间戳(包含在内)和一个结束时间戳(不包含在内),它们一起描述窗口的大小。在代码中,Flink在处理基于时间的窗口时使用TimeWindow,该窗口有查询开始和结束时间戳的方法,还有一个额外的方法maxTimestamp(),它返回给定窗口允许的最大时间戳。

    在下面的文章中, 我们将展示Flink的预定义窗口分配器是如何工作的,以及它们在DataStream程序中是如何使用的。下图中展示了每个分配器是如何工作的,紫色圆圈代表着数据流中的一个元素,这些元素是通过一些key进行分区(在本例中是 user1,user2,user3), X轴显示的是时间进度。

    滚动窗口 Tumbling Windows

    滚动窗口分配器将每个元素分配的一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个5分钟大小的滚动窗口,将计算当前窗口,每5分钟将启动一个新窗口,如下图所示。


    下面的代码展示了如何使用滚动窗口。

    val input: DataStream[T] = ...
    
    / / 滚动事件时间窗口( tumbling event-time windows )
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // 滚动处理时间窗口(tumbling processing-time windows)
    input
        .keyBy(<key selector>)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // 每日偏移8小时的滚动事件时间窗口(daily tumbling event-time windows offset by -8 hours. ) 
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
        .<windowed transformation>(<window function>)
    

    时间间隔可以通过Time.milliseconds(x)Time.seconds(x)Time.minutes(x)等其中的一个来指定。

    在上面最后的例子中,滚动窗口分配器还接受了一个可选的偏移参数,可以用来改变窗口的排列。例如,没有偏移的话按小时的滚动窗口将按时间纪元来对齐,也就是说你将一个如: 1:00:00.0001:59:59.999,2:00:00.0002:59:59.999等,如果你想改变一下,你可以指定一个偏移,如果你指定了一个15分钟的偏移,你将得到1:15:00.0002:14:59.999,2:15:00.0003:14:59.999等。时间偏移一个很大的用处是用来调准非0时区的窗口,例如:在中国你需要指定一个8小时的时间偏移。

    滑动窗口(Sliding Windows)

    滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于滚动参数的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。
    例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据,如下图所示:


    下面的代码展示了如何使用滑动窗口。

    val input: DataStream[T] = ...
    
    // 滑动事件时间窗口 (sliding event-time windows)
    input
        .keyBy(<key selector>)
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // 滑动处理时间窗口 (sliding processing-time windows)
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // 偏移8小时的滑动处理时间窗口(sliding processing-time windows offset by -8 hours)
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
        .<windowed transformation>(<window function>)
    

    时间间隔可以通过Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等来指定。

    正如上述例子所示,滑动窗口分配器也有一个可选的偏移参数来改变窗口的对齐。例如,没有偏移参数,按小时的窗口,有30分钟的滑动,将根据时间纪元来对齐,也就是说你将得到如下的窗口1:00:00.000 - 1:59:59.999, 1:30:00.000 - 2:29:59.999等。而如果你想改变窗口的对齐,你可以给定一个偏移,如果给定一个15分钟的偏移,你将得到如下的窗口:1:15:00.000~2:14.59.999, 1:45:00.000~2:44:59.999等。时间偏移一个很大的用处是用来调准非0时区的窗口,例如:在中国你需要指定一个8小时的时间偏移。

    会话窗口(Session Windows)

    session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况。相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度。当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。



    下面的代码展示了如何使用滑动窗口。

    val input: DataStream[T] = ...
    
    // 事件时间会话窗口(event-time session windows with static gap)
    input
       .keyBy(<key selector>)
       .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
       .<windowed transformation>(<window function>)
    
    // 具有动态间隙的事件时间会话窗口 (event-time session windows with dynamic gap)
    input
       .keyBy(<key selector>)
       .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
         override def extract(element: String): Long = {
           // determine and return session gap
         }
       }))
       .<windowed transformation>(<window function>)
    
    // 具有静态间隙的处理时间会话窗口(processing-time session windows with static gap)
    input
       .keyBy(<key selector>)
       .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
       .<windowed transformation>(<window function>)
    
    
    // 具有动态间隙的处理时间会话窗口(processing-time session windows with dynamic gap)
    input
       .keyBy(<key selector>)
       .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
         override def extract(element: String): Long = {
           // determine and return session gap
         }
       }))
       .<windowed transformation>(<window function>)
    

    静态间隔可以通过使用 Time.milliseconds(x), Time.seconds(x),Time.minutes(x)来指定。
    动态间隙通过实现SessionWindowTimeGapExtractor接口来指定。

    注意: 因为session看窗口没有一个固定的开始和结束,他们的评估与滑动窗口和滚动窗口不同。在内部,session操作为每一个到达的元素创建一个新的窗口,并合并间隔时间小于指定非活动间隔的窗口。为了进行合并,session窗口的操作需要指定一个合并触发器(Trigger)和一个合并窗口函数(Window Function),如:ReduceFunction或者WindowFunction(FoldFunction不能合并)。

    全局窗口(Global Windows)

    全局窗口分配器将所有具有相同key的元素分配到同一个全局窗口中,这个窗口模式仅适用于用户还需自定义触发器的情况。否则,由于全局窗口没有一个自然的结尾,无法执行元素的聚合,将不会有计算被执行。


    代码如下:

    val input: DataStream[T] = ...
    
    input
        .keyBy(<key selector>)
        .window(GlobalWindows.create())
        .<windowed transformation>(<window function>)
    

    窗口函数(Window Functions)


    定义窗口分配器之后,我们需要为每个窗口指定需要执行的计算。这是窗口的职责。当系统决定一个窗口已经准备好执行之后,这个窗口函数将被用来处理窗口中的每一个元素(可能是分组的)。参见 triggers当窗口准备好后, Flink是如何确定何时触发的。

    窗口函数可以是ReduceFunction, AggregateFunction, FoldFunctionProcessWindowFunction其中之一。前两个更高效一些(see State Size section) 因为Flink可以在每个窗口到达时递增地聚合元素。 ProcessWindowFunction 可以获取一个窗口中所有元素的迭代以及关于元素所属窗口的附加元信息。

    窗口转换ProcessWindowFunction与其他操作相比操作效率要差一些,因为Flink内部在调用函数之前必须将窗口中的所有元素都缓冲起来。这可以通过ProcessWindowFunctionReduceFunctionAggregateFunctionFoldFunction结合使用来获取窗口中所有元素的增量聚合以及 ProcessWindowFunction接收的额外窗口元数据,我们将看这些变体的例子。

    ReduceFunction

    ReduceFunction指定了如何通过两个输入参数进行合并和输出一个相同类型参数的过程。Flink使用ReduceFunction来对窗口中的元素进行增量聚合。

    一个ReduceFunction 可以通过如下的方式来定义和使用:

    val input: DataStream[(String, Long)] = ...
    
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
    

    AggregateFunction

    AggregateFunction是ReduceFunction的泛化版本,有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型是输入流中的元素类型,AggregateFunction有一个方法可以向累加器(ACC)添加一个输入元素。该接口还具有创建初始累加器、将两个累加器合并为一个累加器和从累加器提取输出(类型为OUT)的方法。示例如下:
    与ReduceFunction相同,Flink会在输入元素到达窗口时递增地聚合它们。

    AggregateFunction定义和使用:

    /**
     * The accumulator is used to keep a running sum and a count. The [getResult] method
     * computes the average.
     */
    class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
      override def createAccumulator() = (0L, 0L)
    
      override def add(value: (String, Long), accumulator: (Long, Long)) =
        (accumulator._1 + value._2, accumulator._2 + 1L)
    
      override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
    
      override def merge(a: (Long, Long), b: (Long, Long)) =
        (a._1 + b._1, a._2 + b._2)
    }
    
    val input: DataStream[(String, Long)] = ...
    
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .aggregate(new AverageAggregate)
    

    上面的示例计算窗口中元素的第二个字段的平均值。

    FoldFunction

    FoldFunction 指定了一个输入元素如何与一个输出类型的元素合并的过程,这个FoldFunction 会被每一个加入到窗口中的元素和当前的输出值增量地调用,第一个元素是与一个预定义的类型为输出类型的初始值合并。

    实例:

    val input: DataStream[(String, Long)] = ...
    
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .fold("") { (acc, v) => acc + v._2 }
    

    上面例子追加所有输入的长整型到一个空的字符串中。
    注意 fold()不能应用于session window或者其他可合并的窗口中。

    ProcessWindowFunction

    ProcessWindowFunction获取一个迭代函数,该迭代函数包含窗口的所有元素,以及具有访问时间和状态信息的上下文对象,与其他窗口函数相比提供了更多的灵活性。这是以牺牲性能和资源消耗为代价的,因为元素不能递增地聚合,需要在内部缓冲,直到窗口认为可以进行处理为止。

    代码如下:

    abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
    
      /**
        * Evaluates the window and outputs none or several elements.
        *
        * @param key      The key for which this window is evaluated.
        * @param context  The context in which the window is being evaluated.
        * @param elements The elements in the window being evaluated.
        * @param out      A collector for emitting elements.
        * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
        */
      def process(
          key: KEY,
          context: Context,
          elements: Iterable[IN],
          out: Collector[OUT])
    
      /**
        * The context holding window metadata
        */
      abstract class Context {
        /**
          * Returns the window that is being evaluated.
          */
        def window: W
    
        /**
          * Returns the current processing time.
          */
        def currentProcessingTime: Long
    
        /**
          * Returns the current event-time watermark.
          */
        def currentWatermark: Long
    
        /**
          * State accessor for per-key and per-window state.
          */
        def windowState: KeyedStateStore
    
        /**
          * State accessor for per-key global state.
          */
        def globalState: KeyedStateStore
      }
    
    }
    

    ProcessWindowFunction可以通过如下方式调用:

    val input: DataStream[(String, Long)] = ...
    
    input
      .keyBy(_._1)
      .timeWindow(Time.minutes(5))
      .process(new MyProcessWindowFunction())
    
    /* ... */
    
    class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
    
      def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = {
        var count = 0L
        for (in <- input) {
          count = count + 1
        }
        out.collect(s"Window ${context.window} count: $count")
      }
    }
    

    它对窗口中的元素进行计数。此外,窗口函数输出时添加了窗口的信息。

    注意,对于简单的聚合(如count)使用ProcessWindowFunction是非常低效的。下一节将介绍如何将ReduceFunctionAggregateFunctionProcessWindowFunction结合使用以获得增量聚合,以及ProcessWindowFunction的附加信息。

    带有递增聚合的ProcessWindowFunction(ProcessWindowFunction with Incremental Aggregation)

    ProcessWindowFunction可以与ReduceFunctionAggregateFunctionFoldFunction结合来增量地对到达window中的元素进行聚合。当窗口关闭时,ProcessWindowFunction就能提供聚合结果。当获取到ProcessWindowFunction额外的window元信息后就可以进行增量计算窗口了。

    标注:你也可以使用ProcessWindowFunction替换WindowFunction来进行增量窗口聚合。

    使用ReduceFunction进行增量窗口聚合(Incremental Window Aggregation with ReduceFunction)

    下面例子展示了一个增量ReduceFunction如何跟一个ProcessWindowFunction结合,来获取窗口中最小的事件和窗口的开始时间。

    val input: DataStream[SensorReading] = ...
    
    input
      .keyBy(<key selector>)
      .timeWindow(<duration>)
      .reduce(
        (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
        ( key: String,
          window: TimeWindow,
          minReadings: Iterable[SensorReading],
          out: Collector[(Long, SensorReading)] ) =>
          {
            val min = minReadings.iterator.next()
            out.collect((window.getStart, min))
          }
      )
    

    使用AggregateFunction的增量窗口聚合(Incremental Window Aggregation with AggregateFunction)

    下面的示例, 如何将AggregateFunctionProcessWindowFunction结合起来递增的计算平均值,并同时输出收集window 的key和平均值。

    val input: DataStream[(String, Long)] = ...
    
    input
      .keyBy(<key selector>)
      .timeWindow(<duration>)
      .aggregate(new AverageAggregate(), new MyProcessWindowFunction())
    
    // Function definitions
    
    /**
     * The accumulator is used to keep a running sum and a count. The [getResult] method
     * computes the average.
     */
    class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
      override def createAccumulator() = (0L, 0L)
    
      override def add(value: (String, Long), accumulator: (Long, Long)) =
        (accumulator._1 + value._2, accumulator._2 + 1L)
    
      override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
    
      override def merge(a: (Long, Long), b: (Long, Long)) =
        (a._1 + b._1, a._2 + b._2)
    }
    
    class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {
    
      def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double]): () = {
        val average = averages.iterator.next()
        out.collect((key, average))
      }
    }
    

    使用FoldFunction递增窗口聚合(Incremental Window Aggregation with FoldFunction)

    下面的示例,如何将FoldFunctionProcessWindowFunction相结合,递增的提取窗口中的事件数量,并返回窗口的键和结束时间。

    val input: DataStream[SensorReading] = ...
    
    input
     .keyBy(<key selector>)
     .timeWindow(<duration>)
     .fold (
        ("", 0L, 0),
        (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
        ( key: String,
          window: TimeWindow,
          counts: Iterable[(String, Long, Int)],
          out: Collector[(String, Long, Int)] ) =>
          {
            val count = counts.iterator.next()
            out.collect((key, window.getEnd, count._3))
          }
      )
    

    WindowFunction(弃用)

    在一些可以使用ProcessWindowFunction的地方你也可以使用WindowFunction。这是较旧版本,提供较少的上下文信息,并且没有一些高级功能,例如每窗口keyed state。WindowFunction将被弃用。

    trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
    
      /**
        * Evaluates the window and outputs none or several elements.
        *
        * @param key    The key for which this window is evaluated.
        * @param window The window that is being evaluated.
        * @param input  The elements in the window being evaluated.
        * @param out    A collector for emitting elements.
        * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
        */
      def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
    }
    

    可以这样使用:

    val input: DataStream[(String, Long)] = ...
    
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .apply(new MyWindowFunction())
    

    触发器(Triggers)

    触发器决定了一个窗口何时可以被窗口函数处理,每一个窗口分配器都有一个默认的触发器,如果默认的触发器不能满足你的需要,你可以通过调用trigger(...)来自定义的一个触发器。

    触发器的接口有5个方法来允许触发器处理不同的事件:

    • onElement() 每个元素被添加到窗口时调用
    • onEventTime() 当已注册的event-time计时器触发时调用。
    • onProcessingTime() 当注册的processing-time计时器触发时调用。
    • onMerge() 与状态性触发器相关,当使用session窗口时,两个触发器对应的窗口合并时,合并两个触发器的状态。
    • clear() 执行任何需要清除的相应窗口

    上面的方法中有两个需要注意的地方:

    1. 前三个通过返回一个TriggerResult来决定如何操作调用他们的事件,这些操作可以是下面操作中的一个。
      • CONTINUE: 什么也不做
      • FIRE:触发计算
      • PURGE:清除窗口中的数据
      • FIRE_AND_PURGE:触发计算并清除窗口中的数据
    2. 这些函数可以用来为后续的操作 注册处理时间定时器 或者 事件时间计时器

    触发和清除(Fire and Purge)

    一旦触发器决定处理已经准备好window,它将触发并返回FIRE或者FIRE_AND_PURGE。给定一个带有ProcessWindowFunction的窗口,所有元素都传递给ProcessWindowFunction(可能是在它们传递给回收器之后)。带有ReduceFunctionAggregateFunctionFoldFunction的窗口仅仅发出它们渴望聚合的结果。

    当一个触发器触发时,它可以是FIRE或者FIRE_AND_PURGE,如果是FIRE的话,将保留window中的内容,FIRE_AND_PURGE的话,会清除window的内容。默认情况下,预实现的触发器仅仅是FIRE,不会清除window的状态。

    注意: 清除操作仅清除window的内容,不会删除window元信息和触发器状态。

    WindwAssigners的默认触发器(Default Triggers of WindowAssigners)

    WindowAssigner的默认触发器适用于许多种情况,例如:所有的事件时间触发器都有一个EventTimeTrigger作为默认的触发器,这个触发器仅在当水印通过窗口的最后时间时触发。

    注意: GlobalWindow默认的触发器是NeverTrigger,是永远不会触发的,因此,如果你使用的是GlobalWindow的话,你需要定义一个自定义触发器。
    注意: 通过调用trigger(...)来指定一个触发器你就重写了WindowAssigner的默认触发器。例如:如果你为TumblingEventTimeWindows指定了一个CountTrigger,你就不会再通过时间来获取触发了,而是通过计数。现在,如果你想通过时间和计数来触发的话,你需要写你自己自定义的触发器。

    内置和自定义触发器(Built-in and Custom Triggers)

    Flink有几个内置触发器。

    • EventTimeTrigger(前面提到过):触发是基于事件时间用水印来衡量进展的。
    • ProcessingTimeTrigger: 根据处理时间来触发
    • CountTrigger :一旦窗口中的元素个数超出了给定的限制就会触发
    • PurgingTrigger:将另一个触发器作为参数,并将其转换为清除触发器。

    如果你想实现一个自定义的触发器,你需要查看一下这个抽象类 Trigger 请注意,这个API还在优化中,后续的Flink版本可能会改变。

    驱逐器(Evictors)


    Flink的窗口模型允许指定一个除了WindowAssignerTrigger之外的可选参数Evitor,这个可以通过调用evitor(...)方法来实现。这个驱逐器(evitor)可以在触发器触发之前或者之后,或者窗口函数被应用之前清理窗口中的元素。为了达到这个目的,Evitor接口有两个方法:

    /**
     * Optionally evicts elements. Called before windowing function.
     *
     * @param elements The elements currently in the pane.
     * @param size The current number of elements in the pane.
     * @param window The {@link Window}
     * @param evictorContext The context for the Evictor
     */
    void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    
    /**
     * Optionally evicts elements. Called after windowing function.
     *
     * @param elements The elements currently in the pane.
     * @param size The current number of elements in the pane.
     * @param window The {@link Window}
     * @param evictorContext The context for the Evictor
     */
    void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    

    evitorBefore()方法包含了在window function之前被应用的驱逐逻辑,而evictAfter()包含要在窗口函数之后应用的对象。在应用窗口函数之前退出的元素不会被它处理。

    Flink有三个预处理的驱逐器:

    • CountEvictor:在窗口中保留用户指定的元素数量,并丢弃窗口缓冲区开头的剩余元素。
    • DeltaEvictor:通过DeltaFunction 和阈值,计算窗口缓冲区中最后一个元素和每个剩余元素之间的delta值,并清除delta值大于或者等于阈值的元素
    • TimeEvitor:使用一个interval的毫秒数作为参数,对于一个给定的窗口,它会找出元素中的最大时间戳max_ts,并清除时间戳小于max_tx - interval的元素。

    默认情况下:所有预实现的evitor都是在window function前应用它们的逻辑

    注意:指定Evitor可防止预聚合,因为窗口的所有元素必须在应用计算之前传递到驱逐器中 。
    注意:Flink 并不保证窗口中的元素是有序的,所以驱逐器可能从窗口的开始处清除,元素到达的先后不是那么必要。

    允许延迟(Allowed Lateness)

    当处理事件时间的window时,可能会出现元素到达晚了,Flink用来与事件时间联系的水印已经过了元素所属的窗口的最后时间。有关Flink如何处理event time的更深入的讨论,请参阅 event time,尤其是late elements。 late elements

    默认情况下,当水印已经过了窗口的最后时间时晚到的元素会被丢弃。然而,Flink允许为窗口操作指定一个最大允许时延,允许时延指定了元素可以晚到多长时间,默认情况下是0。水印已经过了窗口最后时间后才来的元素,如果还未到窗口最后时间加时延时间,那么元素任然添加到窗口中。如果依赖触发器的使用的话,晚到但是未丢弃的元素可能会导致窗口再次被触发。

    为了达到这个目的,Flink将保持窗口的状态直到允许时延的发生,一旦发生,Flink将清除Window,删除window的状态,如Window 生命周期章节中所描述的那样。
    默认情况下,允许时延为0,也就是说水印之后到达的元素将被丢弃。

    你可以按如下方式来指定一个允许延迟:

    val input: DataStream[T] = ...
    
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .allowedLateness(<time>)
        .<windowed transformation>(<window function>)
    

    当使用GlobalWindows分配器时,没有数据会被认为是延迟的,因为Global Window的最后时间是Long.MAX_VALUE

    以侧输出来获取延迟数据(Getting Late Data as a Site Output)

    在使用Flink的 side output特性时,你可以获得一个已经被丢弃的延迟数据流。

    首先你需要在窗口化的数据流中调用sideOutputLateData(OutputTag)指定你需要获取延迟数据,然后,你就可以在window 操作的结果中获取到侧输出流了。
    代码如下:

    val lateOutputTag = OutputTag[T]("late-data")
    
    val input: DataStream[T] = ...
    
    val result = input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .allowedLateness(<time>)
        .sideOutputLateData(lateOutputTag)
        .<windowed transformation>(<window function>)
    
    val lateStream = result.getSideOutput(lateOutputTag)
    

    延迟元素考虑(Late elements considerations)

    当指定一个允许延迟大于0时,即使水印已经达到了window的最后时间,window以及window中的内容将会继续保留。在这种情况下,当一个延迟事件到来而未丢弃时,它可能会触发window中的其他触发器。这些触发叫做late firings,因为它们是由延迟事件触发的,而与main firing相反,main firing是窗口第一次触发的。在会话窗口的情况下,late firings会进一步导致窗口的合并,因为它“弥补”两个预存的,未合并窗口之间的差距。

    注意,由于延迟触发而的元素被视为上一次计算的更新结果。即数据流将包含多个相同计算的结果数据。根据应用程序的不同,需要考虑合并重复的结果。

    Working with window results

    窗口操作的结果是DataStream,结果元素中不包含任何关于窗口操作的信息,所以如果想要保留关于窗口的元信息,就必须在ProcessWindowFunction中的结果元素中手工编码该信息。在结果元素上设置的唯一相关信息是元素时间戳。被处理窗口的最大允许时间戳设置为-1,即结束时间戳,因为窗口结束时间戳是独占的。注意,事件时间窗口和处理时间都是如此。

    水印和窗口的交互作用(Interaction of watermarks and windows)

    在继续本部分之前,看看关于 event time and watermarks章节。
    当水印到达窗口操作符时,会触发两件事:

    • 水印触发所有窗口的计算,其中最大时间戳(即结束时间戳- 1)小于新水印
    • 水印被传递到下游操作

    一旦在下游操作中接收到水印, 水印会“flushes”被延迟的窗口。

    连续窗口的操作(Consecutive windowed operations)

    如前所述,计算窗口显示结果的时间戳的方式以及水印与窗口交互的方式允许串接连续的窗口操作。当您想要执行两个连续的窗口操作时,如果您想使用不同的键,但仍然希望来自相同上游窗口的元素最终位于相同的下游窗口中,那么这可能非常有用。考虑一下这个例子:

    val input: DataStream[Int] = ...
    
    val resultsPerKey = input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .reduce(new Summer())
    
    val globalResults = resultsPerKey
        .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
        .process(new TopKWindowFunction())
    

    在本例中,第一个操作的time window[0,5]的结果也会在随后的窗口操作中以time window[0,5]结束。这允许计算每个键的和,然后在第二个操作中计算同一个窗口中的top-k元素。

    有用状态大小的考虑(Useful state size considerations)

    window 可以定义一个很长的周期(例如:一天、一周或者一月),因此积累了相当大的状态。这里有些规则,当估计你的窗口计算的存储要求时,需考虑。

    1. Flink会在每个窗口中为每个属于它的元素创建一份备份,鉴于此,滚动窗口保存了每个元素的一个备份,与此相反,滑动窗口会为每个元素创建几个备份,如 Window Assigner章节所述。因此,一个窗口大小为1天,滑动大小为1秒的滑动窗口可能就不是个好的策略了。
    2. ReduceFunctionFoldFunctionAggregateFunction可以大大地降低存储要求,因为它们预聚合元素,每个窗口只存储一个值。相反,只有ProcessWindowFunction需要累积所有的元素。
    3. 使用Evictor可以避免任何预聚合操作,因为窗口中的所有元素都需要在应用计算之前传递到evitor中,Evictors

    相关文章

      网友评论

        本文标题:Flink Streaming Windows操作

        本文链接:https://www.haomeiwen.com/subject/ciiazftx.html