美文网首页
第六章 Flink中的时间和窗口

第六章 Flink中的时间和窗口

作者: 井底蛙蛙呱呱呱 | 来源:发表于2022-11-26 13:36 被阅读0次

    时间语义


    上图是数据流式处理过程,涉及到两个重要的时间点:事件时间(Event Time)和处理时间(Processing Time)。

    • 事件时间(Event Time):即数据产生的时间;
    • 处理时间(Processing Time):即数据真正被处理的时刻;

    我们在处理数据时,以哪种时间作为衡量标准,就是所谓的时间语义问题(Notions of Time)。由于分布式系统中网络传输的延迟和时钟漂移,处理时间相对事件发生的时间会有滞后。在这种情况下,就不能简单地把数据自带的时间戳当作时钟了,而需要用另外的标志来表示事件时间进展,在 Flink 中把它叫作事件时间的“水位线”(Watermarks)。

    水位线(Watermark)

    我们把时钟也数据的形式传递出去,告诉下游任务当前时间的进展;而且这个时钟的传递不会因为窗口聚合之类的运算而停滞。一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接到广播下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似流水中用来当做标志的几号,在Flink中,这种用来衡量事件时间(Event Time)进展的标记,就被称作水位线(Watermark)。

    水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而他插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。


    理想的水位线是有序的,但是现实中由于不可控因素常常会有少量乱序的数据。




    周期性生成时间戳,保存区间最大值

    水位线代表当前事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢失数据,这一点对于乱序流的正确处理非常重要。水位线的特性:

    • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据;
    • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展;
    • 水位显示基于数据的时间戳生成的;
    • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进;
    • 水位线可以通过设置延迟,以保证正确处理乱序数据;
    • 一个水位线(Watermark)t表示在当前流中事件时间已经到达了时间戳t,这导表t之前的所有数据都到齐了,之后流中不会出现时间戳t'<t的数据;

    水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理。

    如何生成水位线

    在生成水位线的时候,如果希望计算结果更准确,可以将水位线延迟设置得更高一些,等待时间越长,越不容易漏掉数据,但是这样时效性降低了。而如果将等待时间设置过短则会遗漏掉部分数据,虽然Flink提供了处理迟到数据的方法,但是需要分开处理。因此如何设置延迟是一个需要根据实际情况权衡的问题。

    在Flink的DataStream API中,有一个单独用于生成水位线的方法:assignTimestampAndWatermarks(),他主要用来为流中的数据分配时间戳,并生成水位线来显示时间。该方法需要传入一个WatermarkStrategy作为参数,WatermarkStrategy 中包含了一个“时间戳分配器”TimestampAssigner和一个“水位线生成器”WatermarkGenerator:

    • TimestampAssigner, 主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础;
    • WatermarkGenerator, 主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator中主要有两个方法:onEvent和onPeriodicEmit;
      • onEvent, 每个事件(数据)到来都会调用的方法,他的参数有当前事件、时间戳,以及允许发出水位线的一个WatermarkOutput,可以基于事件做各种操作;
      • onPeriodiEmit, 周期性调用的方法,可以由WatermarkOutput发出水位线。周期时间为处理时间,可以调用环境配置的setAutoWatermarkInterval()方法来设置,默认为200ms;
    代码
    

    Flink提供了内置的水位线生成器:

    • 有序流,对于有序流,主要特点就是时间戳单调增长(Monotonously Increasing Timestamps),所以永远不会出现迟到数据的问题。这是周期性生成水位线的最简单的场景,直接调用WatermarkStrategy.forMonotonousTimestamps()方法就可以实现。简单来说,就是直接拿当前最大的时间戳作为水位线就可以了。
    • 乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的时间延迟(Fixed Amount of Lateness)。这时生成水位线的时间戳,就是当前数据流中最大的时间戳减去延迟的结果,相当于把表调慢,当前时钟会滞后于数据的最大时间戳。调用 WatermarkStrategy. forBoundedOutOfOrderness()方法就可以实现。这个方法需要传入一个 maxOutOfOrderness 参数,表示“最大乱序程度”,它表示数据流中乱序数据时间戳的最大差值;如果我们能确定乱序程度,那么设置对应时间长度的延迟,就可以等到所有的乱序数据了。
    代码
    
    自定义水位线

    在WatermarkStrategy中,时间戳分配器TimestampAssigner都是大同小异的,指定字段提取时间戳就可以了。不同策略的关键在于WatermarkGenerator的实现。整体来说,Flink有两种不同的生产水位线的方式:一种是周期性的(Periodic),另一种是断点式的(Punctuated):

    • 周期性水位线生成器(Periodic Generator),周期性生成器一般是通过 onEvent()观察判断输入的事件,而在 onPeriodicEmit()里发出水位线;
    • 断点式水位线生成器(Punctuated Generator),断点式生成器会不停地检测 onEvent()中的事件,当发现带有水位线信息的特殊事件时,就立即发出水位线。一般来说,断点式生成器不会通过 onPeriodicEmit()发出水位线。

    此外,也可以在自定义数据源中发送水位线,但是这样就不能使用assignTimestampsAndWatermarks 方法来生成水位线了,两者只能二选一。

    水位线的传递

    在“重分区”(redistributing)的传输模式下,一个任务有可能会收到来自不同分区上游子任务的数据。而不同分区的子任务时钟并不同步,所以同一时刻发给下游任务的水位线可能并不相同。这说明上游各个分区处理得有快有慢,进度各不相同,这时我们应该以最慢的那个时钟,也就是最小的那个水位线为准。

    窗口(Window)

    Flink是一种流式计算引擎,主要是用来处理无界数据流的。想要更加方便的处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(window)。在Flink中,窗口就是用来处理无界流的核心。

    由于存在迟到数据的问题,将窗口视为一个框可能并不是最合适的。我们可以把它理解成一个“桶”(bucket):每个数据都会分发到对应的桶中,当到达窗口的结束时间时,就对每个桶中收集的数据进行计算处理。


    窗口的分类

    • 按照驱动类型分类:
      • 1)时间窗口,时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据;
      • 2)计数窗口,计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数,就是窗口的大小。
    • 按照窗口分配数据的规则分类:1)滚动窗口,2)滑动窗口,3)会话窗口,4)全局窗口;





    窗口API概览

    在定义窗口操作之前,需要先确定到底是基于按键分区的数据流KeyedStream还是在没有按键分区的DataStream上面开窗。也即调用窗口算子之前是否有keyBy操作。

    而在API上面的区别也是非常小:

    // 按键分区
    stream.keyBy(...).window(...)
    
    // 非按键分区
    stream.windowAll(...)
    

    窗口分配器(Window Assigner)

    定义窗口分配器(Window Assigner)是构建窗口算子的第一步,他的作用就是定义数据应该被分配到哪个窗口。通过向上一节中的window/windowAll函数中传入WindowAssigner参数,返回WindowStream。

    不同窗口类型有不同的窗口分配器。

    1、时间窗口
    // 滚动处理时间窗口
    stream.keyBy(...)
      .window(TumblingProcessingTimeWindows.of(Time.seconds(5))
      .aggregate(...)
    
    // 滑动处理时间窗口
    stream.keyBy(...)
      .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))
      .aggregate(...)
    
    // 处理时间会话窗口
    stream.keyBy(...)
      .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
      .aggregate(...)
    
    // 滚动事件时间窗口
    stream.keyBy(...)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .aggregate(...)
    
    // 滑动事件时间窗口
    stream.keyBy(...)
      .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
      .aggregate(...)
    
    // 事件时间会话窗口
    stream.keyBy(...)
      .window(EventTimeSessionWindows.WithGap(Time.seconds(10)))
      .aggregate(...)
    
    // 2、计数窗口
    // 滚动计数窗口, 定义一个长度为10的滚动计数窗口
    stream.keyBy(...)
      .countWindow(10)
    
    // 滑动计数窗口,长度为10,步长为3
    stream.keyBy(...)
      .countWindow(10, 3)
    
    // 3、全局窗口, 全局窗口必须自行定义触发器才能实现窗口计算,否则起不到任何作用
    stream.keyBy(...)
      .window(GlobalWindows.create())
    

    窗口函数(Window Functions)

    在上面定义了窗口分配器,我们只是知道了数据属于哪个窗口,而本节介绍的窗口函数则是如何将这些窗口中的数据收集起来,即如何处理。

    窗口函数是作用在windowStream上面的,返回的是DataStream。各种stream间的转换如下:


    1、增量聚合函数
    为了提高实时性,我们可以像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以了;区别只是在于不立即输出结果,而是要等到窗口结束时间。等到窗口到了结束时间需要输出计算结果的时候,我们只需要拿出之前聚合的状态直接输出,这无疑就大大提高了程序运行的效率和实时性。

    典型的增量聚合函数有两个:ReduceFunction和AggregateFunction。

    ReduceFunction:

    package com.whu.chapter06
    
    import com.whu.chapter05.{ClickSource, Event}
    
    import java.time.Duration
    import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
    
    object WindowFunctionDemo {
      def main(args:Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        env.addSource(new ClickSource())
          // 数据源中的时间戳是单调递增的,所以使用下面的方法,只需要抽取时间戳就好了
          // 等同于最大延迟时间是0毫秒
          .assignAscendingTimestamps(_.timeStamp)
          .map(r => (r.user, 1L))
          // 使用用户名对数据流进行分组
          .keyBy(_._1)
          // 设置5秒钟的滚动事件时间窗口
          .window(TumblingEventTimeWindows.of(Time.seconds(5)))
          // 保留第一个字段,针对第二个字段进行聚合
          .reduce((r1, r2) => (r1._1, r1._2+r2._2))
          .print()
        
        env.execute()
      }
    }
    

    AggregateFunction
    ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。

    AggregateFunction 在源码中的定义如下:

    public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable 
    {
      // 创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次
      ACC createAccumulator();
    
      // 将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程
      ACC add(IN value, ACC accumulator);
    
      // getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出
      OUT getResult(ACC accumulator);
    
      // 合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用
      ACC merge(ACC a, ACC b);
    }
    

    AggregateFunction接受3个数据类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

        env.addSource(new ClickSource())
          .assignAscendingTimestamps(_.timeStamp)
          // 通过为每条数据分配相同的key,来将数据发送到同一个分区
          .keyBy(_ => "key")
          .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
          .aggregate(new AvgPv)
    
        env.execute()
      
      class AvgPv extends AggregateFunction[Event,(Set[String], Double), Double] {
        // 创建空累加器,类型是元组,元组的第一个元素类型为Set数据结构,用来对用户名去重
        // 第二个元素用来累加pv操作,也就是没来一条数据就加一
        override def createAccumulator(): (Set[String], Double) = (Set[String](), 0L)
    
        // 累加规则
        override def add(in: Event, acc: (Set[String], Double)): (Set[String], Double) = {
          (acc._1+in.user, acc._2+1)
        }
    
        // 获取窗口关闭时向下游发送的结果
        override def getResult(acc: (Set[String], Double)): Double = {
          acc._2/(acc._1.size.toDouble)
        }
    
        // merge方法只有在事件时间的会话窗口时,才需要实现,这里无需实现
        override def merge(acc: (Set[String], Double), acc1: (Set[String], Double)): (Set[String], Double) = ???
      }
    

    全窗口函数(Full Window Functions)
    窗口操作中的另一大类就是全窗口函数,与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。

    Flink中的全窗口函数有两种:WindowFunction和ProcessWindowFunction。

    // 窗口函数
    stream.keyBy(<key selector>)
      .window(<window assigner>)
      .apply(new MyWindowFunction())
    

    处理窗口函数ProcessWindowFunction是Window API中最底层的通用窗口函数接口。除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象(context)”。这个上下文对象不仅有窗口信息,还可以访问当前的时间和状态信息。这里的时间包括了处理时间(process time)和事件时间水位线(event time watermark)。这使得ProcessWindowFunction更加灵活、功能更加丰富,可以认为是一个增强版的WindowFunction。

    // Full WindowFunction
        env.addSource(new ClickSource())
          .assignAscendingTimestamps(_.timeStamp)
          // 为所有数据都指定同一个key,可以将所有数据发送到同一个分区
          .keyBy(_ => "key")
          .window(TumblingEventTimeWindows.of(Time.seconds(10)))
          .process(new UvCountByWindow)
          .print()
        
        env.execute()
      
      // 自定义窗口处理函数
      class UvCountByWindow extends ProcessWindowFunction[Event, String, String, TimeWindow]{
        // 
        override def process(key: String, context: Context, elements: Iterable[Event], out: Collector[String]): Unit = {
          // 初始化一个Set数据结构,用来对用户名进行去重
          var userSet = Set[String]()
          // 将所有用户进行去重
          elements.foreach(userSet += _.user)
          // 结合窗口信息,包装输出内容
          val windowStart = context.window.getStart
          val windowEnd = context.window.getEnd
          out.collect(" 窗口:"+ new Timestamp(windowStart) + " ~ "+ new Timestamp(windowEnd) + " 独立访客数为:" + userSet.size)
        }
    
    增量和聚合函数结合使用

    增量聚合相当于把计算量“均摊”到了窗口收集数据的过程中,自然就会比全窗口聚合更加高效、输出更加实时。而全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作,窗口计算更加灵活,功能更加强大。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。

    这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数
    据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。

        // 全窗口函数和聚合函数结合使用
        env.addSource(new ClickSource())
          .assignAscendingTimestamps(_.timeStamp)
          // 使用url作为key对数据进行分区
          .keyBy(_.url)
          .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
          // 注意这里调用的是aggregate方法
          // 增量聚合函数和全窗口聚合函数结合使用
          .aggregate(new UrlViewCountAgg, new UrlViewCountResult)
          .print()
    
      class UrlViewCountAgg extends AggregateFunction[Event, Long, Long] {
        override def createAccumulator(): Long = 0L
    
        // 每来一个事件就加1
        override def add(in: Event, acc: Long): Long = acc + 1L
    
        // 窗口闭合时发送的计算结果
        override def getResult(acc: Long): Long = acc
    
        override def merge(acc: Long, acc1: Long): Long = ???
      }
    
      case class UrlViewCount(url: String, count: Long, windowStart: Long, windowEnd: Long)
    
      class UrlViewCountResult extends ProcessWindowFunction[Long, UrlViewCount, String, TimeWindow] {
        // 迭代器中只有一个元素,是增量聚合函数在窗口闭合时发送过来的计算结果
        override def process(key: String, context: Context, elements: Iterable[Long], out: Collector[UrlViewCount]): Unit = {
          out.collect(UrlViewCount(key, elements.iterator.next(), context.window.getStart, context.window.getEnd))
        }
      }
    

    其它API

    • 触发器(Trigger):触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程;Trigger 是窗口算子的内部属性,每个窗口分配器(WindowAssigner)都会对应一个默认的触发器;对于 Flink 内置的窗口类型,它们的触发器都已经做了实现;
    • 移除器(Evictor):移除器主要用来定义移除某些数据的逻辑;
    • 允许延迟(Allowed Lateness):为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算;
    • 将迟到的数据放入侧输出流:Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些本该被丢弃的数据。

    窗口的生命周期

    熟悉了窗口 API 的使用,这里梳理一下窗口本身的生命周期,这也是对窗口所有操作的一个总结:

    • 窗口的创建;
    • 窗口计算的触发;
    • 窗口的销毁;


    迟到数据的处理

    所谓的“迟到数据”(late data),是指某个水位线之后到来的数据,它的时间戳其实是在水位线之前的。所以只有在事件时间语义下,讨论迟到数据的处理才是有意义的。

    • 设置水位线延迟时间;
    • 允许窗口处理迟到数据;
    • 将迟到数据放入窗口侧输出流;
    package com.whu.chapter06
    
    import com.whu.chapter05.{ClickSource, Event}
    import com.whu.chapter06.WindowFunctionDemo.{UrlViewCountAgg, UrlViewCountResult}
    
    import java.time.Duration
    import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
    import org.apache.flink.api.common.functions.AggregateFunction
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
    import org.apache.flink.streaming.api.windowing.assigners.{SlidingEventTimeWindows, TumblingEventTimeWindows}
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    import java.sql.Timestamp
    
    object ProcessLateDataDemo {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        
        // 为方便测试,读取socket文本流进行处理
        val stream = env.socketTextStream("localhost", 7777)
          .map(data => {
            val fields = data.split(",")
            Event(fields(0).trim, fields(1).trim, fields(2).trim.toLong)
          })
        
        // 方式1:设置Watermark延迟时间 2秒钟
        val res1 = stream.assignTimestampsAndWatermarks(WatermarkStrategy
          // 最大延迟时间设置为5秒钟
          .forBoundedOutOfOrderness[Event](Duration.ofSeconds(2))
          .withTimestampAssigner( new SerializableTimestampAssigner[Event] {
            override def extractTimestamp(t: Event, l: Long): Long = t.timeStamp
          })
        )
        
        // 定义侧输出流标签
        val outputTag = OutputTag[Event]("late")
        val res2 = stream
          .keyBy(_.url)
          .window(TumblingEventTimeWindows.of(Time.seconds(10)))
          // 方式二:允许窗口处理迟到数据,设置1分钟的等待时间
          .allowedLateness(Time.minutes(1))
          // 方式三:将最后的迟到数据输出到侧输出流
          .sideOutputLateData(outputTag)
          .aggregate(new UrlViewCountAgg, new UrlViewCountResult)
        
        res2.print()
        
        res2.getSideOutput(outputTag).print("late")
        
        // 为方便观察,可以将原始数据也输出
        stream.print("input")
      }
    }
    

    相关文章

      网友评论

          本文标题:第六章 Flink中的时间和窗口

          本文链接:https://www.haomeiwen.com/subject/ghmffdtx.html