美文网首页
Flink Window Triggers 触发器 和 EVIC

Flink Window Triggers 触发器 和 EVIC

作者: Rex_2013 | 来源:发表于2020-09-07 17:54 被阅读0次

前言

触发器定义了window何时会被求值以及何时发送求值结果。触发器可以到了特定的时间触发也可以碰到特定的事件触发。例如:观察到事件数量符合一定条件或者观察到了特定的事件。
清理器是一个可选的组件,可以被注入到ProcessWindowFunction之前或者之后调用。evictor可以清除掉window中收集的元素。由于evictor需要迭代所有的元素,所以evictor只能使用在没有增量聚合函数作为参数的情况下。

如果对window 的概念或者window的分配器不熟悉的话,可以看下前面的文章Flink Window

首先来回顾下关于Flink Window 操作流程

Flink window operator 流程

当一个事件来到窗口操作符,首先将会传给 WindowAssigner 来处理。WindowAssigner 决定
了事件将被分配到哪些窗口。如果窗口不存在,WindowAssigner 将会创建一个新的窗口。
如果一个 window operator 接受了一个增量聚合函数作为参数,例如 ReduceFunction 或者
AggregateFunction,新到的元素将会立即被聚合,而聚合结果 result 将存储在 window 中。如
果 window operator 没有使用增量聚合函数,那么新元素将被添加到 ListState 中,ListState 中
保存了所有分配给窗口的元素。
新元素被添加到窗口时,这个新元素同时也被传给了 window 的 trigger。trigger 定义了 window
何时准备好求值,何时 window 被清空。trigger 可以基于 window 被分配的元素和注册的定
时器来对窗口的所有元素求值或者在特定事件清空 window 中所有的元素。

官网提供的关于WIndows 的算子流程

Keyed Windows

stream
       .keyBy(...)               <-  keyed versus non-keyed windows
       .window(...)              <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

-----------------------------------------------------------------------------------------------------------------------

Non-Keyed Windows

stream
       .windowAll(...)           <-  required: "assigner"
      [.trigger(...)]            <-  optional: "trigger" (else default trigger)
      [.evictor(...)]            <-  optional: "evictor" (else no evictor)
      [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
      [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      <-  required: "function"
      [.getSideOutput(...)]      <-  optional: "output tag"

Triggers

Trigger确定了窗口(由窗口分配器形成)什么时候准备好执行窗口函数处理。每个WindowAssigner都有一个默认值Trigger。如果默认触发器不符合您的需求,则可以使用指定自定义触发器trigger()。


触发器作用

默认的触发器将会在两种情况下触发
处理时间:机器时间到达处理时间
事件时间:水位线超过了窗口的结束时间

触发器可以访问流的时间属性以及定时器,还可以对state状态编程。所以触发器和process function一样强大。
例如我们可以实现一个触发逻辑:当窗口接收到一定数量的元素时,触发器触发。
再比如当窗口接收到一个特定元素时,触发器触发。
还有就是当窗口接收到的元素里面包含特定模式(5秒钟内接收到了两个同样类型的事件),触发器也可以触发。
在一个事件时间的窗口中,一个自定义的触发器可以提前(在水位线没过窗口结束时间之前)计算和发射计算结果。这是一个常见的低延迟计算策略,尽管计算不完全,但不像默认的那样需要等待水位线没过窗口结束时间。

Trigger API

我们看一下Trigger API:

onElement()  对于添加到窗口中的每个元素,都会调用该方法。
onEventTime()  当注册的事件时间计时器触发时,将调用该方法。
onProcessingTime() 当注册的处理时间计时器触发时,将调用该方法。
onMerge() 与有状态触发器相关,并且在两个触发器的相应窗口合并时(例如,在使用会话窗口时)合并两个触发器的状态。
clear() 执行删除相应窗口后所需的任何操作。

关于上述方法,需要注意两件事:

  1. 前面的三个方法都会产生一个TriggerResult来决定窗口接下来发生什么。TriggerResult可以取以下结果:

    • CONTINUE:什么都不做
    • FIRE:如果window operator有ProcessWindowFunction这个参数,将会调用这个0 ProcessWindowFunction。如果窗口仅有增量聚合函数(ReduceFunction或者AggregateFunction)作为参数,那么当前的聚合结果将会被发送。窗口的state不变。
    • PURGE:窗口所有内容包括窗口的元数据都将被丢弃。
    • FIRE_AND_PURGE:先对窗口进行求值,再将窗口中的内容丢弃。
  2. 这些方法中的任何一种都可用于注册处理或事件时间计时器以用于将来的操作。

TriggerResult可能的取值使得我们可以实现很复杂的窗口逻辑。一个自定义触发器可以触发多次,可以计算或者更新结果,可以在发送结果之前清空窗口。

Fire and Purge

当Trigger确定窗口已准备好进行处理,返回FIRE或FIRE_AND_PURGE。window operator根据 FIRE或FIRE_AND_PURGE 去执行当前窗口的操作。像ProcessWindowFunction全窗口函数会处理 all elements,ReduceFunction,AggregateFunction或,FoldFunction增量聚合函数只会增量聚合

当触发器触发时,它可以是FIRE或FIRE_AND_PURGE。在FIRE保留窗口内容的同时,FIRE_AND_PURGE删除其内容。默认情况下,预先实现的触发器仅触发FIRE而不会清除窗口状态。

注意: Purge 将仅删除窗口的内容,并将保留有关该窗口的任何潜在元信息以及任何trigger state 。

当在trigger中使用per-window state时,这里我们需要保证当窗口被删除时state也要被删除,否则随着时间的推移,window operator将会积累越来越多的数据,最终可能使应用崩溃。

当窗口被删除时,为了清空所有状态,触发器的clear()方法需要需要删掉所有的自定义per-window state,以及使用TriggerContext对象将处理时间和事件时间的定时器都删除。

WindowAssigners的默认触发器

默认Trigger的WindowAssigner是适用于很多情况。例如,所有事件时间窗口分配器都有EventTimeTrigger默认触发器。一旦WaterMark通过窗口的闭合时间,此trigger便会触发。

注意: GlobalWindow的 默认触发器是NeverTrigger永不触发。因此,使用时,GlobalWindow使用时必须定义一个自定义触发器
注意: 通过使用指定触发器,trigger()您将覆盖的默认触发器WindowAssigner。例如,如果您指定为 CountTrigger,则TumblingEventTimeWindows您将不再基于时间进度而是仅通过计数获得窗口触发。现在,如果要基于时间和计数做出反应,则必须编写自己的自定义触发器。

FLink 自带的和自定义Triggers

  • Flink带有一些内置触发器。
 EventTimeTrigger根据事件时间(由WaterMark判断)的进度触发。
 ProcessingTimeTrigger基于处理时间的Trigger。
 CountTrigger一旦窗口中的元素数量超过给定的限制,就会触发。
 PurgingTrigger
  • 通过Trigger 类实现自定义触发器

请注意,API仍在不断更新,并可能在Flink的未来版本中更改。

Triggers 使用案例

下面的例子展示了一个触发器在窗口结束时间之前触发。当第一个事件被分配到窗口时,这个触发器注册了一个定时器,定时时间为水位线之前一秒钟。当定时事件执行,将会注册一个新的定时事件,这样,这个触发器每秒钟最多触发一次。

import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
import org.apache.flink.streaming.api.windowing.triggers.{Trigger, TriggerResult}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object TriggerExample {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)

    val stream = env
      .socketTextStream("localhost", 9999, '\n')
      .map(line => {
        val arr = line.split(" ")
        (arr(0), arr(1).toLong)
      })
      .assignAscendingTimestamps(_._2)
      .keyBy(_._1)
      .timeWindow(Time.seconds(10))
      .trigger(new OneSecondIntervalTrigger)
      .process(new WindowCount)

    stream.print()

    env.execute()
  }

  // 只在整数秒和窗口结束时间时触发窗口计算!
  class OneSecondIntervalTrigger extends Trigger[(String, Long), TimeWindow] {
    // 每来一条数据都要调用一次!
    override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
      // 默认值为false
      // 当第一条事件来的时候,会在后面的代码中将firstSeen置为true
      val firstSeen = ctx.getPartitionedState(
        new ValueStateDescriptor[Boolean]("first-seen", Types.of[Boolean])
      )

      // 当第一条数据来的时候,!firstSeen.value()为true
      // 仅对第一条数据注册定时器
      // 这里的定时器指的是:onEventTime函数!
      if (!firstSeen.value()) {
        // 第一条数据`a 1234`来的时候,水位线是:-9223372036854775808
        // 过了200ms,插入了一个水位线1234 - 1 = 1233
        // 水位线后面的整数秒是:-9223372036854774000
        println("第一条数据来了!当前水位线是:" + ctx.getCurrentWatermark)
        val t = ctx.getCurrentWatermark + (1000 - (ctx.getCurrentWatermark % 1000))
        println("第一条数据来了以后,注册的定时器的整数秒的时间戳是:" + t)
        ctx.registerEventTimeTimer(t) // 在第一条数据的时间戳之后的整数秒注册一个定时器
        ctx.registerEventTimeTimer(window.getEnd) // 在窗口结束事件注册一个定时器
        firstSeen.update(true)
      }
      TriggerResult.CONTINUE
    }

    override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
      TriggerResult.CONTINUE
    }

    // 定时器函数,在水位线到达time时,触发
    override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
      // 在onElement函数中,我们注册过窗口结束时间的定时器
      if (time == window.getEnd) {
        // 在窗口闭合时,触发计算并清空窗口
        TriggerResult.FIRE_AND_PURGE
      } else {
        // 1233ms后面的整数秒是2000ms
        val t = ctx.getCurrentWatermark + (1000 - (ctx.getCurrentWatermark % 1000))
        // 保证t小于窗口结束时间
        if (t < window.getEnd) {
          println("注册的定时器的整数秒的时间戳是:" + t)
          // 这里注册的定时器还是onEventTime函数
          ctx.registerEventTimeTimer(t)
        }
        // 触发窗口计算
        println("在 " + time + " 触发了窗口计算!")
        TriggerResult.FIRE
      }
    }

    override def clear(window: TimeWindow, ctx: TriggerContext): Unit = {
      // 状态变量是一个单例!
      val firstSeen = ctx.getPartitionedState(
        new ValueStateDescriptor[Boolean]("first-seen", Types.of[Boolean])
      )

      firstSeen.clear()
    }
  }

  class WindowCount extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[(String, Long)], out: Collector[String]): Unit = {
      out.collect("窗口中有 " + elements.size + " 条数据!窗口结束时间是" + context.window.getEnd)
    }
  }
}

Evictors

evictor可以在window function求值之前或者之后移除窗口中的元素。

我们看一下Evictor的接口定义:

public interface Evictor<T, W extends Window>  extends Serializable {
  void evictBefore(
    Iterable<TimestampedValue<T>> elements,
    int size,
    W window,
    EvictorContext evictorContext);

  void evictAfter(
    Iterable<TimestampedValue<T>> elements,
    int size,
    W window,
    EvictorContext evictorContext);

  interface EvictorContext {

    long getCurrentProcessingTime();

    long getCurrentWatermark();
  }
}

evictBefore()和evictAfter()分别在window function计算之前或者之后调用。Iterable迭代器包含了窗口所有的元素,size为窗口中元素的数量,window object和EvictorContext可以访问当前处理时间和水位线。可以对Iterator调用remove()方法来移除窗口中的元素。

evictor也经常被用在GlobalWindow上,用来清除部分元素,而不是将窗口中的元素全部清空。

参考flink 官网 flink Windows trigger

相关文章

网友评论

      本文标题:Flink Window Triggers 触发器 和 EVIC

      本文链接:https://www.haomeiwen.com/subject/addkektx.html