美文网首页FlinkFlink
数据分析实践 | flink | window窗口篇

数据分析实践 | flink | window窗口篇

作者: Sevsea | 来源:发表于2018-10-13 18:51 被阅读0次

    要弄懂flink全部的工作机制其实是一件比较麻烦的事情,零碎的概念太多,对于第一次接触flink的我也算是个挑战。

    整体感受就是--看官方文档的过程就像在做高中的阅读理解:)
    如果你刚接触,看官方文档又很晕,那请看下去咯,我会配合它工作流程的顺序一一介绍概念。

    接触不久,文章如有错误还请大牛多指点。


    flink流计算--window窗口

    window是处理数据的核心。按需选择你需要的窗口类型后,它会将传入的原始数据流切分成多个buckets,所有计算都在window中进行。

    这里按照数据处理前、中、后为过程来描述一个窗口的工作过程。

    0x01数据处理前的分流

    窗口在处理数据前,会对数据做分流,有两种控制流的方式:

    * Keyed Windows
    <可以理解为�按照原始数据流中的某个key进行分类,拥有同一个key值的数据流将为进入同一个window,多个窗口并行的逻辑流>
    
            stream
               .keyBy(...)               <-  keyed versus non-keyed windows
               .window(...)              <-  required: "assigner"
              [.trigger(...)]            <-  optional: "trigger" (else default trigger)
              [.evictor(...)]            <-  optional: "evictor" (else no evictor)
              [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
              [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
               .reduce/aggregate/fold/apply()      <-  required: "function"
              [.getSideOutput(...)]      <-  optional: "output tag"
    
    * Non-Keyed Windows
    <不做分类,每进入一条数据即增加一个窗口,多个窗口并行,每个窗口处理1条数据>    
    
            stream
                   .windowAll(...)           <-  required: "assigner"
                  [.trigger(...)]            <-  optional: "trigger" (else default trigger)
                  [.evictor(...)]            <-  optional: "evictor" (else no evictor)
                  [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
                  [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
                   .reduce/aggregate/fold/apply()      <-  required: "function"
                  [.getSideOutput(...)]      <-  optional: "output tag"
    
    用`[ ]`包含的内容,其中的方法均为可选函数,如需了解可以查阅一下官方文档。
    

    0x02窗口函数的准备

    对于每个window必备的是触发器Trigger和一个附加在window上的函数
    ProcessWindowFunction
    ReduceFunction
    AggregateFunction
    FoldFunction
    用于实现window中对数据流的操作。
    就是上面分流示例中的.reduce/aggregate/fold/apply()中需要做的操作。

    在对数据流做处理前,需要先预设一些窗口的配置,先看一下窗口的一些类型:

    2.1窗口选择

    划分
    
    * time  - 根据时间划分窗口
        #时间类型:
        * EventTime 数据本身携带的时间
        * ProcessingTime 处理时间
        
    * count - 根据数据量划分窗口
    
    属性
    
    * size=interval 无重叠数据,可理解为翻滚窗口,
    * size>interval 有重叠数据,可理解为滑动窗口
    
    手画了一张图表示这两个窗口的区别:
    滑动窗口(10s,5s)的意思是时间窗口长度为10s,滑动长度为5s。
    
    于是一共有这几个窗口类型
    
    * 无重叠时间窗口
    * 有重叠时间窗口
    * 有重叠数据窗口
    * 无重叠数据窗口
    

    实际场景中用的较多的还是时间窗口,以时间窗口为例。

    2.2时间窗口

    声明使用的窗口时间类型:

    Scala:
    import org.apache.flink.streaming.api.TimeCharacteristic
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic ( TimeCharacteristic.EventTime )
    //设置窗口时间类型,如若此处为ProcessingTime则无需做后续的指定时间操作,其他两种时间均需要指定数据流中的时间参数。
    

    选择完时间类型之后,我们优先挑选最复杂的一种情况来说明时间戳和水位线的工作机制,如果选择了EventTime,需要指定数据流中的时间戳。

    val Stream :DataStream[String]=env.addSource(kafkaTableSoure)
    val frequency_ = Stream.map({...})//此处略过数据的预处理
    val frequency = frequency_.assignTimestampsAndWatermarks(new TimestampExtractor())//指定时间戳与水位线
    

    指定时间戳和配置水位线就由这个函数TimestampExtractor()继承AssignerWithPeriodicWatermarks方法来配置

    class TimestampExtractor extends AssignerWithPeriodicWatermarks[Map[String,Any]] with Serializable {
    
      var currentMaxTimestamp: Long = _
      val maxOutOfOrderness = 10000L//最大允许的乱序时间是10s
    
      var a : Watermark = null
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
      //extractTimestamp指定毫秒级时间戳,必须是毫秒级!!!
      override def extractTimestamp(t: Map[String,Any], l: Long): Long = {
        val timestamp = t.get("time_local").get.toString.toLong*1000L 
        //此处是因为时间戳没有毫秒级,故*1000变成毫秒级时间戳
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
        println("timestamp:" + timestamp+","+format.format(timestamp) +"|"+ currentMaxTimestamp + ","+ format.format(currentMaxTimestamp) + ","+ a.toString)
        timestamp
      }
      //getCurrentWatermark设置水位线
      override def getCurrentWatermark: Watermark = {
        a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
        a
      }
    }
    

    这个class中有两个方法,分别配置时间戳和水位线,关于这两个概念需要介绍一下:

    2.2.1 分配时间戳

    为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每条数据都需要分配其事件时间戳。

    这通常通过提取每条数据中的固定字段来完成时间戳的获取。

    2.2.2 水印

    这段代码中其实没有体现出水印的概念,但水印与时间戳、watermark都有密切关系。

    时间戳分配与生成水印密切相关,水印告诉系统事件时间的进展。决定水位线的高度。

    有两种方式可以分配时间戳并生成水印:
    1.直接在数据流源中
        通过时间戳分配器/水印生成器:在Flink中,时间戳分配器定义要发出的水印。
        ex:
        参考上方示例代码中的`TimestampExtractor()`类
        
    2.Kafka读取时间戳
        当使用Apache Kafka作为数据源时,每个Kafka分区可能具有简单的事件时间模式(升序时间戳或有界无序)。
        在这种情况下,可以使用Flink的Kafka分区感知水印生成,通常Kafka是多个分区并行读取,每个Kafka分区在Kafka使用者内部生成水印。
        如果严格按照Kafka分区升序,则使用升序时间戳水印生成器生成每分区水印将产生完美的整体水印。
        ex:
        val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
        kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
            def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
        })
        val stream: DataStream[MyType] = env.addSource(kafkaSource)
    
    2.2.3 水位线Watermark

    通常在处理EventTime事件时间的时候使用,流式传输程序需要相应地设置时间特性。
    数据流的到达顺序我们无法保证的情况下,需要对迟到的数据进行处理,Periodic水位线便是配置这个特性。
    AssignerWithPeriodicWatermarks分配时间戳并定期生成水印

    //getCurrentWatermark设置水位线
      override def getCurrentWatermark: Watermark = {
        a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
        a
      }
    
    代码中currentMaxTimestamp为当前的事件时间,a 则是watermark的值。
    需要我们设置的是maxOutOfOrderness这个差值,来确认数据最大可以迟到多久。
    

    图片来自:https://blog.csdn.net/lmalds/article/details/52704170 文章对水位线描述的很详细。

    这里借这张图来说明一下乱序数据流到达时的工作过程:

    预设值:
    * window大小是3秒,window被分为如下的形式,
    [19:34:21,19:34:24)
    [19:34:24,19:34:27)
    ...
    * 水位线设置差值为10s
    
    1.EventTime为19:34:22的数据到达,创建第一个窗口。
    此时水位线为19:34:22-10s=19:34:12,此条数据属于[19:34:21,19:34:24)窗口范围内。
    
    2.EventTime为19:34:26的数据到达,创建第二个窗口。
    此时水位线为19:34:26-10s=19:34:16,此条数据属于[19:34:24,19:34:27)窗口范围内
    
    3.EventTime为19:34:32的数据到达,进入第一个窗口。
    此时水位线为19:34:32-10s=19:34:22,因为设置最大可延迟10s到达 ,所以这条数据属于[19:34:21,19:34:24)窗口范围内。
    
    4.EventTime为19:34:33的数据到达,进入第一个窗口
    此时水位线为19:34:33-10s=19:34:23,因为设置最大可延迟10s到达 ,所以此条数据属于[19:34:21,19:34:24)窗口范围内
    
    5.EventTime为19:34:34的数据到达,进入第二个窗口
    此时水位线为19:34:34-10s=19:34:24,是[19:34:21,19:34:24)窗口的临界值,
    触发执行窗口函数,统计第一个窗口中的数据。
    
    6.EventTime为19:34:36的数据到达,进入第二个窗口
    此时水位线为19:34:36-10s=19:34:26,属于[19:34:24,19:34:27)窗口范围内
    ...
    

    2.3窗口函数

    窗口函数是触发器在确认窗口数据到达完毕后,执行的函数。

    flink提供了两类窗口函数,

    • AggerateFunction/ReduceFunction/FoldFunction/...

    此类为数据计算函数,适用于仅计算,无需做时间窗口的情况。
    AggerateFunction为用户自定义函数,可以按照个人需求做各类统计。

    • WindowFunction/ProcessWindowFunction/...

    此类为做窗口函数,适用于无需计算只做时间窗口统计的情况。(ps.貌似很少有这样的情况猴)

    计算函数+窗口函数

    两种方式的结合适用于需要进行计算后再做滑动窗口统计结果的情况。
    (直接对全部SourceData数据做WindowFunction消耗会较大,所以先做计算,提取出需要的特征、结果后,减轻窗口函数的压力。)

    窗口函数计算完毕后,就能够得到计算结果了,整个流程便算是结束了。

    0x03 TODO

    概念差不多都解释清楚啦。
    在下篇中将详细讲概念实践,也会细说一下AggerateFunction自定义函数的使用。
    待更。

    相关文章

      网友评论

        本文标题:数据分析实践 | flink | window窗口篇

        本文链接:https://www.haomeiwen.com/subject/xhzgaftx.html