Flink Window

作者: Rex_2013 | 来源:发表于2020-08-25 18:04 被阅读0次

    1.window 概述

    • streaming流式计算是一种被设计用于处理无限数据集的数据处理引擎,而无限数据集是指一种不断增长的本质上无限的数据集
    • Window窗口就在一个无界流中设置起始位置和终止位置,让无界流变成有界流,并且在有界流中进行
      数据处理
    • Window操作常见的业务场景:统计过去一段时间、最近一些元素的数据指标

    2.window 窗口的类型

    2.1 根据数据流是否keyBy划分 Keyed vs Non-Keyed Windows

    • 要指定是否是 Keyed windows 需要在window前指定(可查看下面代码),使用keyBy(...)会将无界流分组为逻辑键流。如果keyBy(...)未调用,则不会为您的流根据key来分组。
    • 对stream分流之后,可以使窗口化计算可以由多个任务并行执行(也就是说可以分流到不同slot 甚至是不同的机器上面去做并行计算),因为每个逻辑键控流都可以根据key分流独立于其余逻辑流进行处理。而引用同一键的所有元素将被发送到同一并行任务。
      对于没有keyBy的流,原始流将不会拆分为多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行度为1(假如前面的DataStream没有设置多并行情况下)
    • flink的设计是先分流再做窗口,Google Dataflow 是先窗口再分流
    Keyed Windows
    
    stream
           .keyBy(...)               <-  keyed versus non-keyed windows
           .window(...)              <-  required: "assigner"
          [.trigger(...)]            <-  optional: "trigger" (else default trigger)
          [.evictor(...)]            <-  optional: "evictor" (else no evictor)
          [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
          [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
           .reduce/aggregate/fold/apply()      <-  required: "function"
          [.getSideOutput(...)]      <-  optional: "output tag"
    
    -----------------------------------------------------------------------------------------------------------------------
    
    Non-Keyed Windows
    
    stream
           .windowAll(...)           <-  required: "assigner"
          [.trigger(...)]            <-  optional: "trigger" (else default trigger)
          [.evictor(...)]            <-  optional: "evictor" (else no evictor)
          [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
          [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
           .reduce/aggregate/fold/apply()      <-  required: "function"
          [.getSideOutput(...)]      <-  optional: "output tag"
    

    2.2 根据不同的Window Assigners划分

    • WindowAssigner 负责将每条输入的数据分发到正确的 window 中。这是通过 在window(...)(针对KeyedStream)或windowAll()(针对DataStream)传入不同的WindowAssigner来完成的。window() 方法接收的输入参数是一个 WindowAssigner。WindowAssigner负责将每个传入元素分配给一个或多个窗口(滑动窗口,有些元素可能需要复制到多个窗口中)。

    • Flink中根据比较常见的场景提供了一些WindowAssigner:tumbling windows, sliding windows, session windows and global windows 。也可以通过实现WindowAssigner class来自定义一些窗口分配器。

    • 所有flink定义的窗口分配器(全局窗口除外)都是基于时间将元素分配给窗口。这个时间的定义可以是 processing time,也可以是event time。在生产需求中,大部分使用event time 。关于时间的定义以及水位的定义,会在后面的文章涉及到。

    • 下面简单介绍下flink提供的常用的WindowAssigner
      下面用的图显示了每种WindowAssigner的工作情况。紫色圆圈表示流的元素,这些元素由某个键(在这种情况下为用户1,用户2和用户3)划分。x轴显示时间进度。

    tumbling windows, sliding windows, session windows and global windows

    2.2.1 滚动窗口(Tumbling Windows)
    • 将数据依据固定的窗口长度对数据进行切片。
    • 特点:时间对齐,窗口长度固定,没有重叠。
    • 滚动窗口分配器将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。例如:如果你指定了一个5分钟大小的window size,窗口的创建如下图所示:


      tumbling windows
    • 适用场景:适合做BI统计等(做每个时间段的聚合计算)。
    • 官网提供的代码
    val input: DataStream[T] = ...
    
    // tumbling event-time windows
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // tumbling processing-time windows
    input
        .keyBy(<key selector>)
        .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // daily tumbling event-time windows offset by -8 hours.
    input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
        .<windowed transformation>(<window function>)
    

    通过静态方法TumblingEventTimeWindows.of来实例化TumblingEventTimeWindows类,可以通过源码看到TumblingEventTimeWindows构造是有两个参数

     private TumblingProcessingTimeWindows(long size, long offset) {
            ...
    }
    

    时间间隔可以通过指定Time.milliseconds(x),Time.seconds(x), Time.minutes(x)
    至于offset 参数,该参数可用于更改窗口的对齐方式。例如,如果没有偏移,则每小时滚动窗口与epoch对齐,即你将获得诸如的窗口 1:00:00.000 - 1:59:59.999,2:00:00.000 - 2:59:59.999等等。如果要更改,可以提供一个偏移量。随着15分钟的偏移量,你会,例如,拿 1:15:00.000 - 2:14:59.999,2:15:00.000 - 3:14:59.999等
    一个重要的用例的偏移是窗口调整到比UTC-0时区等。例如,在中国,必须指定的偏移量Time.hours(-8)。

    • 简单案例
    import java.sql.DriverManager
    
    import org.apache.flink.api.common.functions.ReduceFunction
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.{ProcessWindowFunction, WindowFunction}
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    object TublingTimeWindowKeyedStream {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        val initStream: DataStream[String] = env.socketTextStream("node01", 8888)
        val wordStream = initStream.flatMap(_.split(" "))
        val pairStream = wordStream.map((_, 1))
        //是一个已经分好流的无界流
        val keyByStream = pairStream.keyBy(_._1)
        keyByStream
          .timeWindow(Time.seconds(5))
          .reduce(new ReduceFunction[(String, Int)] {
            override def reduce(value1: (String, Int), value2: (String, Int)): (String, Int) = {
              (value1._1, value2._2 + value1._2)
            }
          },new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow] {
            override def process(key: String, context: Context, elements: Iterable[(String, Int)], out: Collector[(String, Int)]): Unit = {
            }
          }).print()
        env.execute()
      }
    }
    
    
    2.2.2 滑动窗口(Sliding Windows)
    • 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成。
    • 特点:时间对齐,窗口长度固定,可以有重叠。
    • 滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。
      例如,你有10分钟的窗口和5分钟的滑动,那么每个窗口中5分钟的窗口里包含着上个10分钟产生的数据,如下图所示:


      Sliding Windows
    • 适用场景:对最近一个时间段内的统计(求某接口最近5min的失败率来决定是否要报警)。
    • 官网提供的代码
    val input: DataStream[T] = ...
    
    // sliding event-time windows
    input
        .keyBy(<key selector>)
        .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // sliding processing-time windows
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
        .<windowed transformation>(<window function>)
    
    // sliding processing-time windows offset by -8 hours
    input
        .keyBy(<key selector>)
        .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
        .<windowed transformation>(<window function>)
    

    通过静态方法SlidingProcessingTimeWindows.of来实例化SlidingProcessingTimeWindows类,可以通过源码看到SlidingProcessingTimeWindows构造是有三个参数

    private SlidingProcessingTimeWindows(long size, long slide, long offset) {
            ...
    }
    

    offset 参数作用跟TumblingEventTimeWindows的offset参数一样。

    • 简单案例
    import java.util.Properties
    
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    import org.apache.kafka.common.serialization.StringSerializer
    
    object SlidingTimeWindowKeyedStream {
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //设置连接kafka的配置信息
        val props = new Properties()
        props.setProperty("bootstrap.servers","node01:9092,node02:9092,node03:9092")
        props.setProperty("group.id","flink-kafka-001")
        props.setProperty("key.deserializer",classOf[StringSerializer].getName)
        props.setProperty("value.deserializer",classOf[StringSerializer].getName)
    
        val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka",new SimpleStringSchema(),props))
    
        stream.map(data =>{
          val splits = data.split("\t")
          (splits(1),splits(3).toLong)
        }).keyBy(_._1)
          .timeWindow(Time.minutes(30),Time.seconds(10))
          .aggregate(new AggregateFunction[(String,Long),(String,Long,Long),(String,Double)] {
    
            override def createAccumulator(): (String, Long, Long) = ("",0,0)
    
            override def add(value: (String, Long), accumulator: (String, Long, Long)): (String, Long, Long) = {
              (value._1,accumulator._2+value._2,accumulator._3 + 1)
            }
    
            override def getResult(accumulator: (String, Long, Long)): (String, Double) = {
              (accumulator._1,accumulator._2.toDouble/accumulator._3)
            }
    
            override def merge(a: (String, Long, Long), b: (String, Long, Long)): (String, Long, Long) = {
              (a._1,a._2+b._2,a._3+b._3)
            }
          }).print()
        env.execute()
      }
    }
    
    2.2.3 会话窗口(Session Windows)
    • 由一系列事件组合一个指定时间长度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。
    • 特点:时间无对齐。
    • session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的长度,当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。


      Session Windows
    • 官网提供的代码
    val input: DataStream[T] = ...
    
    // event-time session windows with static gap
    input
        .keyBy(<key selector>)
        .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
        .<windowed transformation>(<window function>)
    
    // event-time session windows with dynamic gap
    input
        .keyBy(<key selector>)
        .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
          override def extract(element: String): Long = {
            // determine and return session gap
          }
        }))
        .<windowed transformation>(<window function>)
    
    // processing-time session windows with static gap
    input
        .keyBy(<key selector>)
        .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
        .<windowed transformation>(<window function>)
    
    
    // processing-time session windows with dynamic gap
    input
        .keyBy(<key selector>)
        .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
          override def extract(element: String): Long = {
            // determine and return session gap
          }
        }))
        .<windowed transformation>(<window function>)
    
    • 注意 由于会话窗口没有固定的开始和结束,因此对它们的判断窗口的大小是不同于滚动窗口和滑动窗口。
      会话窗口运算符会为每个到达的记录创建一个新窗口,如果窗口彼此之间的距离比已定义的间隔小,则将它们合并在一起。为了可合并的,会话窗口操作者需要一个合并触发器以及合并的 窗口函数,如ReduceFunctionAggregateFunction,或ProcessWindowFunctionFoldFunction不能合并。)

    • 简单案例

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
    import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, ProcessingTimeSessionWindows}
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    object SessionWindowTest {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val stream = env.socketTextStream("node01",8888)
        //session生命周期 10s   也就是说窗口如果连着10s中没有新的数据进入窗口,窗口就会滑动(触发计算)
    //    EventTime   事件时间    Process Time:元素被处理的系统时间
    //    stream.windowAll(EventTimeSessionWindows.withGap()).print()
        env.execute()
      }
    }
    
    2.2.4 全局窗口(Global Windows)
    • 一个全局的窗口分配器分配使用相同的密钥相同的单个的所有元素的全局窗口。

    • 特点:只有一个窗口

    • 需要注意的是必需指定自定义触发器时,此窗口方案才有用。否则,将不会执行任何计算,因为全局窗口没有可以处理聚合元素的自然端。

      Global Windows
    • 官网提供的代码

    val input: DataStream[T] = ...
    
    input
        .keyBy(<key selector>)
        .window(GlobalWindows.create())
        .<windowed transformation>(<window function>)
    

    stream
           .keyBy(<key selector>)              
           .window/windowAll(<window assigner>)
           .<windowed transformation>(<window function>)    
    

    2.3 根据不同的度量

    • Window可以分成两类:
      CountWindow:按照指定的数据条数生成一个Window,与时间无关。
      TimeWindow:按照时间生成Window。

    3. Window Functions 窗口函数

    元素从source获取之后或者被处理过之后,通过window assigner 定义了窗口类型并且给予分配到窗口后,window functions 定义了要对窗口中收集的数据做的计算操作

    flink window 流程图

    Window Function 可以分为两类

    3.1 增量聚合函数(incremental aggregation functions)

    聚合原理:窗口内保存一个中间聚合结果,随着新元素的加入,在每个窗口到达时以增量方式聚合它们
    典型函数:ReduceFunction, AggregateFunction
    优点:这类函数通常非常节省空间效率相对更优。


    3.1.1 ReduceFunction

    ReduceFunction指定如何将输入中的两个元素组合在一起以产生相同类型的输出元素。Flink使用ReduceFunction来逐步聚合窗口的元素。

    下面代码是reduce 转换函数的flink源码:
    输入:一个ReduceFunction 也可以传入一个reduce 逻辑的匿名函数
    输出:一个DataStream

     /**
       * @param function The reduce function.
       * @return The data stream that is the result of applying the reduce function to the window.
       */
      def reduce(function: ReduceFunction[T]): DataStream[T] = {
        asScalaStream(javaStream.reduce(clean(function)))
      }
    

    下面是flink 官网提供的代码,示例汇总了窗口中所有元素的元组的第二个字段。

    val input: DataStream[(String, Long)] = ...
    
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
    
    3.1.2 AggregateFunction

    AggregateFunction是一个一般化版本ReduceFunction,其具有三种泛型:输入类型(IN),累加器类型(ACC),和一个输出类型(OUT)。输入类型是输入流中元素的类型,并且AggregateFunction具有将一个输入元素添加到累加器的方法(add)。该接口还具有创建初始累加器(createAccumulator),将两个累加器合并为一个累加器以及OUT从累加器提取输出(类型)的方法(getResult)。在下面的示例中,我们将了解其工作原理。
    与ReduceFunction一样,Flink将在输入元素到达窗口时增量地聚合它们。
    AggregateFunction可以被定义并这样使用:

    /**
     * The accumulator is used to keep a running sum and a count. The [getResult] method
     * computes the average.
     */
    class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
      override def createAccumulator() = (0L, 0L)
    
      override def add(value: (String, Long), accumulator: (Long, Long)) =
        (accumulator._1 + value._2, accumulator._2 + 1L)
    
      override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
    
      override def merge(a: (Long, Long), b: (Long, Long)) =
        (a._1 + b._1, a._2 + b._2)
    }
    
    val input: DataStream[(String, Long)] = ...
    
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .aggregate(new AverageAggregate)
    
    3.1.3 增量函数案例

    案例1:每隔10s统计每辆汽车的平均速度

    import java.util.Properties
    
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    import org.apache.kafka.common.serialization.StringSerializer
    
    object Demo03SpeedAVG {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val props = new Properties()
        props.setProperty("bootstrap.servers", "node01:9092,node02:9092,node03:9092")
        props.setProperty("group.id", "flink-kafka-001")
        props.setProperty("key.deserializer", classOf[StringSerializer].getName)
        props.setProperty("value.deserializer", classOf[StringSerializer].getName)
    
        val stream = env.addSource(new FlinkKafkaConsumer[String]("flink-kafka", new SimpleStringSchema(), props))
    
        stream.map(data => {
          val splits = data.split("\t")
          (splits(1),splits(3).toInt)
        }).keyBy(_._1)
          .timeWindow(Time.seconds(10))
          .aggregate(new AggregateFunction[(String,Int),(String,Int,Int),(String,Double)] {
            override def createAccumulator(): (String, Int, Int) = ("",0,0)
    
            override def add(value: (String, Int), accumulator: (String, Int, Int)): (String, Int, Int) = {
              (value._1,value._2+accumulator._2,accumulator._3+1)
            }
    
            override def getResult(accumulator: (String, Int, Int)): (String, Double) = {
              (accumulator._1,accumulator._2.toDouble/accumulator._3)
            }
    
            override def merge(a: (String, Int, Int), b: (String, Int, Int)): (String, Int, Int) = {
              (a._1,a._2+b._2,a._3+b._3)
            }
          }).print()
    
        env.execute()
      }
    }
    
    3.1.3 FoldFunction

    FoldFunction指定了一个输入元素如何与一个指定输出类型的元素合并的过程,这个FoldFunction 会被每一个加入到窗口中的元素和当前的输出值增量地调用,第一个元素是与一个预定义的类型为输出类型的初始值合并。

    下面是flink 官网提供的代码,最初化一个为空的String,将所有输入Long(v._2)值附加到这个String上。

    val input: DataStream[(String, Long)] = ...
    
    input
        .keyBy(<key selector>)
        .window(<window assigner>)
        .fold("") { (acc, v) => acc + v._2 }
    
    • 注意:fold()不能与会话窗口或其他可合并窗口一起使用。

    3.2 全量聚合函数(full window functions)

    聚合原理:收集窗口内的所有元素,获取Iterable窗口中包含的所有元素的,以及有关元素所属窗口的其他元信息,并且在执行的时候对他们进行遍历。
    特点:可以获取Flink执行的上下文,可以拿到当前的数据更多信息,比如窗口状态、窗口起始与终止时间、当前水印、时间戳等


    3.2.1 ProcessWindowFunction

    ProcessWindowFunction获得一个Iterable,它包含窗口的所有元素,以及一个可以访问时间和状态信息的Context对象,这使它比其他窗口函数具有更大的灵活性。这是以性能和资源消耗为代价的,因为不能增量聚合元素,而是需要在内部对其进行缓冲,直到将窗口关闭时才会进行处理。

    ProcessWindowFunction 的源码如下:

    abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
    
      /**
        * Evaluates the window and outputs none or several elements.
        *
        * @param key      The key for which this window is evaluated.
        * @param context  The context in which the window is being evaluated.
        * @param elements The elements in the window being evaluated.
        * @param out      A collector for emitting elements.
        * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
        */
      def process(
          key: KEY,
          context: Context,
          elements: Iterable[IN],
          out: Collector[OUT])
    
      /**
        * The context holding window metadata
        */
      abstract class Context {
        /**
          * Returns the window that is being evaluated.
          */
        def window: W
    
        /**
          * Returns the current processing time.
          */
        def currentProcessingTime: Long
    
        /**
          * Returns the current event-time watermark.
          */
        def currentWatermark: Long
    
        /**
          * State accessor for per-key and per-window state.
          */
        def windowState: KeyedStateStore
    
        /**
          * State accessor for per-key global state.
          */
        def globalState: KeyedStateStore
      }
    
    }
    
    • 注意 因为这个参数key是使用KeySelector来指定key的,会被keyBy()调用。如果是使用tuple-index 或String field 引用,则始终使用tuple,Tuple并且必须手动将其强制转换为正确大小的元组以提取键字段。

    ProcessWindowFunction可以定义像这样使用:

    val input: DataStream[(String, Long)] = ...
    
    input
      .keyBy(_._1)
      .timeWindow(Time.minutes(5))
      .process(new MyProcessWindowFunction())
    
    /* ... */
    
    class MyProcessWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] {
    
      def process(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]) = {
        var count = 0L
        for (in <- input) {
          count = count + 1
        }
        out.collect(s"Window ${context.window} count: $count")
      }
    }
    

    该示例显示了一个ProcessWindowFunction计算窗口中元素的方法,函数将有关窗口的信息输出。

    • 注意 将ProcessWindowFunction 来做简单的aggregates 操作(例如count)效率是很低的。
      下面将说明如何组合使用 ReduceFunction或AggregateFunction与ProcessWindowFunction,既可以使用增量聚合也可以获取到窗口相关的信息

    3.3 ProcessWindowFunction with Incremental Aggregation

    ProcessWindowFunction可以与ReduceFunction,AggregateFunction或FoldFunction组合,当元素到到达窗口时,增量聚合元素。当窗口关闭时,ProcessWindowFunction将提供汇总结果。这样一来,它便可以增量地计算窗口,又可以访问到窗口元信息。

    旧版的可以使用WindowFunction来代替 ProcessWindowFunction


    3.3.1 ReduceFunction 组合使用

    以下示例显示了如何将ReduceFunction与ProcessWindowFunction组合使用以增量返回窗口中最小的事件以及窗口的开始时间。

    val input: DataStream[SensorReading] = ...
    
    input
      .keyBy(<key selector>)
      .timeWindow(<duration>)
      .reduce(
        (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
        ( key: String,
          context: ProcessWindowFunction[_, _, _, TimeWindow]#Context,
          minReadings: Iterable[SensorReading],
          out: Collector[(Long, SensorReading)] ) =>
          {
            val min = minReadings.iterator.next()
            out.collect((context.window.getStart, min))
          }
        )
    
    3.3.2 AggregateFunction组合使用

    以下示例显示了如何将AggregateFunction与ProcessWindowFunction组合使用以增量计算平均值,并且和key一起返回。

    val input: DataStream[(String, Long)] = ...
    
    input
      .keyBy(<key selector>)
      .timeWindow(<duration>)
      .aggregate(new AverageAggregate(), new MyProcessWindowFunction())
    
    // Function definitions
    
    /**
     * The accumulator is used to keep a running sum and a count. The [getResult] method
     * computes the average.
     */
    class AverageAggregate extends AggregateFunction[(String, Long), (Long, Long), Double] {
      override def createAccumulator() = (0L, 0L)
    
      override def add(value: (String, Long), accumulator: (Long, Long)) =
        (accumulator._1 + value._2, accumulator._2 + 1L)
    
      override def getResult(accumulator: (Long, Long)) = accumulator._1 / accumulator._2
    
      override def merge(a: (Long, Long), b: (Long, Long)) =
        (a._1 + b._1, a._2 + b._2)
    }
    
    class MyProcessWindowFunction extends ProcessWindowFunction[Double, (String, Double), String, TimeWindow] {
    
      def process(key: String, context: Context, averages: Iterable[Double], out: Collector[(String, Double)]) = {
        val average = averages.iterator.next()
        out.collect((key, average))
      }
    }
    
    3.3.3 FoldFunction组合使用

    以下示例显示了如何将FoldFunction与ProcessWindowFunction组合视同以增量提取窗口中的事件数,并还返回窗口的key和结束时间。

    val input: DataStream[SensorReading] = ...
    
    input
     .keyBy(<key selector>)
     .timeWindow(<duration>)
     .fold (
        ("", 0L, 0),
        (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
        ( key: String,
          window: TimeWindow,
          counts: Iterable[(String, Long, Int)],
          out: Collector[(String, Long, Int)] ) =>
          {
            val count = counts.iterator.next()
            out.collect((key, window.getEnd, count._3))
          }
      )
    

    3.4 其它可选API

    • .trigger() —— 触发器 定义 window 什么时候关闭,触发计算并输出结果
    • .evitor() —— 移除器 定义移除某些数据的逻辑
    • .allowedLateness() —— 允许处理迟到的数据
    • .sideOutputLateData() —— 将迟到的数据放入侧输出流
    • .getSideOutput() —— 获取侧输出流

    参考 flink官网 windows

    相关文章

      网友评论

        本文标题:Flink Window

        本文链接:https://www.haomeiwen.com/subject/pugajktx.html