Flink WaterMark 详解

作者: yljphp | 来源:发表于2019-04-02 15:37 被阅读12次

    背景

    image

    实时计算中,数据时间比较敏感。有eventTime和processTime区分,一般来说eventTime是从原始的消息中提取过来的,processTime是Flink自己提供的,Flink中一个亮点就是可以基于eventTime计算,这个功能很有用,因为实时数据可能会经过比较长的链路,多少会有延时,并且有很大的不确定性,对于一些需要精确体现事件变化趋势的场景中,单纯使用processTime显然是不合理的。

    概念

    watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp.watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。

    流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

    但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

    window划分

    window的设定无关数据本身,而是系统定义好了的。
    window是flink中划分数据一个基本单位,window的划分方式是固定的,默认会根据自然时间划分window,并且划分方式是前闭后开。

    window划分 w1 w2 w3
    3s [00:00:00~00:00:03) [00:00:03~00:00:06) [00:00:06~00:00:09)
    5s [00:00:00~00:00:05) [00:00:05~00:00:10) [00:00:10~00:00:15)
    10s [00:00:00~00:00:10) [00:00:10~00:00:20) [00:00:20~00:00:30)
    1min [00:00:00~00:01:00) [00:01:00~00:02:00) [00:02:00~00:03:00)

    示例

    如果设置最大允许的乱序时间是10s,滚动时间窗口为5s

    {"datetime":"2019-03-26 16:25:24","name":"zhangsan"}
    //currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:24],currentMaxTimestamp:[2019-03-26 16:25:24],watermark:[2019-03-26 16:25:14]
    
    

    触达改记录的时间窗口应该为2019-03-26 16:25:20~2019-03-26 16:25:25
    即当有数据eventTime >= 2019-03-26 16:25:35 时

    {"datetime":"2019-03-26 16:25:35","name":"zhangsan"}
    //currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:35],currentMaxTimestamp:[2019-03-26 16:25:35],watermark:[2019-03-26 16:25:25]
    //(zhangsan,1,2019-03-26 16:25:24,2019-03-26 16:25:24,2019-03-26 16:25:20,2019-03-26 16:25:25)
    

    后面会详细讲解。_

    提取watermark

    watermark的提取工作在taskManager中完成,意味着这项工作是并行进行的的,而watermark是一个全局的概念,就是一个整个Flink作业之后一个warkermark。

    AssignerWithPeriodicWatermarks

    定时提取watermark,这种方式会定时提取更新wartermark。

    //默认200ms
    public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
        this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
        if (characteristic == TimeCharacteristic.ProcessingTime) {
            getConfig().setAutoWatermarkInterval(0);
        } else {
            getConfig().setAutoWatermarkInterval(200);
        }
    }
    

    AssignerWithPunctuatedWatermarks

    伴随event的到来就提取watermark,就是每一个event到来的时候,就会提取一次Watermark。
    这样的方式当然设置watermark更为精准,但是当数据量大的时候,频繁的更新wartermark会比较影响性能。
    通常情况下采用定时提取就足够了。

    使用

    设置数据流时间特征

    //设置为事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    

    默认为TimeCharacteristic.ProcessingTime,默认水位线更新每隔200ms

    入口文件

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    //便于测试,并行度设置为1
    env.setParallelism(1)
    
    //env.getConfig.setAutoWatermarkInterval(9000)
    
    //设置为事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    //设置source 本地socket
    val text: DataStream[String] = env.socketTextStream("localhost", 9000)
    
    
    val lateText = new OutputTag[(String, String, Long, Long)]("late_data")
    
    val value = text.filter(new MyFilterNullOrWhitespace)
    .flatMap(new MyFlatMap)
    .assignTimestampsAndWatermarks(new MyWaterMark)
    .map(x => (x.name, x.datetime, x.timestamp, 1L))
    .keyBy(_._1)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .sideOutputLateData(lateText)
    //.sum(2)
    .apply(new MyWindow)
    //.window(TumblingEventTimeWindows.of(Time.seconds(3)))
    //.apply(new MyWindow)
    value.getSideOutput(lateText).map(x => {
    "延迟数据|name:" + x._1 + "|datetime:" + x._2
    }).print()
    
    value.print()
    
    env.execute("watermark test")
    
    
    class MyWaterMark extends AssignerWithPeriodicWatermarks[EventObj] {
    
      val maxOutOfOrderness = 10000L // 3.0 seconds
      var currentMaxTimestamp = 0L
    
      val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
    
      /**
        * 用于生成新的水位线,新的水位线只有大于当前水位线才是有效的
        *
        * 通过生成水印的间隔(每n毫秒)定义 ExecutionConfig.setAutoWatermarkInterval(...)。
        * getCurrentWatermark()每次调用分配器的方法,如果返回的水印非空并且大于先前的水印,则将发出新的水印。
        *
        * @return
        */
      override def getCurrentWatermark: Watermark = {
        new Watermark(this.currentMaxTimestamp - this.maxOutOfOrderness)
      }
    
      /**
        * 用于从消息中提取事件时间
        *
        * @param element                  EventObj
        * @param previousElementTimestamp Long
        * @return
        */
      override def extractTimestamp(element: EventObj, previousElementTimestamp: Long): Long = {
    
        currentMaxTimestamp = Math.max(element.timestamp, currentMaxTimestamp)
    
        val id = Thread.currentThread().getId
        println("currentThreadId:" + id + ",key:" + element.name + ",eventTime:[" + element.datetime + "],currentMaxTimestamp:[" + sdf.format(currentMaxTimestamp) + "],watermark:[" + sdf.format(getCurrentWatermark().getTimestamp) + "]")
    
        element.timestamp
      }
    }
    

    代码详解

    1. 设置为事件时间
    2. 接受本地socket数据
    3. 抽取timestamp生成watermark,打印(线程id,key,eventTime,currentMaxTimestamp,watermark)
    4. event time每隔3秒触发一次窗口,打印(key,窗口内元素个数,窗口内最早元素的时间,窗口内最晚元素的时间,窗口自身开始时间,窗口自身结束时间)

    试验

    第一次

    数据

    {"datetime":"2019-03-26 16:25:24","name":"zhangsan"}
    

    输出

    |currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:24],currentMaxTimestamp:[2019-03-26 16:25:24],watermark:[2019-03-26 16:25:14]
    

    汇总

    Key EventTime currentMaxTimestamp Watermark
    zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14

    第二次

    数据

    {"datetime":"2019-03-26 16:25:27","name":"zhangsan"}
    

    输出

    currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:27],currentMaxTimestamp:[2019-03-26 16:25:27],watermark:[2019-03-26 16:25:17]
    

    汇总

    Key EventTime currentMaxTimestamp Watermark
    zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
    zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17

    随着EventTime的升高,Watermark升高。

    第三次

    数据

    {"datetime":"2019-03-26 16:25:34","name":"zhangsan"}
    

    输出

    currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:34],currentMaxTimestamp:[2019-03-26 16:25:34],watermark:[2019-03-26 16:25:24]
    

    汇总

    Key EventTime currentMaxTimestamp Watermark
    zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
    zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17
    zhangsan 2019-03-26 16:25:34 2019-03-26 16:25:34 2019-03-26 16:25:24

    到这里,window仍然没有被触发,此时watermark的时间已经等于了第一条数据的Event Time了。

    第四次

    数据

    {"datetime":"2019-03-26 16:25:35","name":"zhangsan"}
    
    image

    输出

    currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:35],currentMaxTimestamp:[2019-03-26 16:25:35],watermark:[2019-03-26 16:25:25]
    (zhangsan,1,2019-03-26 16:25:24,2019-03-26 16:25:24,2019-03-26 16:25:20,2019-03-26 16:25:25)
    
    image

    汇总

    Key EventTime currentMaxTimestamp Watermark WindowStartTime WindowEndTime
    zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
    zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17
    zhangsan 2019-03-26 16:25:34 2019-03-26 16:25:34 2019-03-26 16:25:24
    zhangsan 2019-03-26 16:25:35 2019-03-26 16:25:35 2019-03-26 16:25:25 [2019-03-26 16:25:20 2019-03-26 16:25:25)

    直接证明了window的设定无关数据本身,而是系统定义好了的。
    输入的数据中,根据自身的Event Time,将数据划分到不同的window中,如果window中有数据,则当watermark时间>=Event Time时,就符合了window触发的条件了,最终决定window触发,还是由数据本身的Event Time所属的window中的window_end_time决定。

    当最后一条数据16:25:35到达是,Watermark提升到16:25:25,此时窗口16:25:20~16:25:25中有数据,Window被触发。

    第五次

    数据

    {"datetime":"2019-03-26 16:25:37","name":"zhangsan"}
    

    输出

    currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:37],currentMaxTimestamp:[2019-03-26 16:25:37],watermark:[2019-03-26 16:25:27]
    

    汇总

    Key EventTime currentMaxTimestamp Watermark WindowStartTime WindowEndTime
    zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
    zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17
    zhangsan 2019-03-26 16:25:34 2019-03-26 16:25:34 2019-03-26 16:25:24
    zhangsan 2019-03-26 16:25:35 2019-03-26 16:25:35 2019-03-26 16:25:25 [2019-03-26 16:25:20 2019-03-26 16:25:25)
    zhangsan 2019-03-26 16:25:37 2019-03-26 16:25:37 2019-03-26 16:25:27

    此时,watermark时间虽然已经达到了第二条数据的时间,但是由于其没有达到第二条数据所在window的结束时间,所以window并没有被触发。

    第二条数据所在的window时间是:[2019-03-26 16:25:25,2019-03-26 16:25:30)

    第六次

    数据

    {"datetime":"2019-03-26 16:25:40","name":"zhangsan"}
    

    输出

    currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:40],currentMaxTimestamp:[2019-03-26 16:25:40],watermark:[2019-03-26 16:25:30]
    (zhangsan,1,2019-03-26 16:25:27,2019-03-26 16:25:27,2019-03-26 16:25:25,2019-03-26 16:25:30)
    

    汇总

    Key EventTime currentMaxTimestamp Watermark WindowStartTime WindowEndTime
    zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
    zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17
    zhangsan 2019-03-26 16:25:34 2019-03-26 16:25:34 2019-03-26 16:25:24
    zhangsan 2019-03-26 16:25:35 2019-03-26 16:25:35 2019-03-26 16:25:25 [2019-03-26 16:25:20 2019-03-26 16:25:25)
    zhangsan 2019-03-26 16:25:37 2019-03-26 16:25:37 2019-03-26 16:25:27
    zhangsan 2019-03-26 16:25:40 2019-03-26 16:25:40 2019-03-26 16:25:30 [2019-03-26 16:25:25 2019-03-26 16:25:30)

    结论

    window的触发要符合以下几个条件:

    1. watermark时间 >= window_end_time
    2. 在[window_start_time,window_end_time)中有数据存在

    同时满足了以上2个条件,window才会触发。
    watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加.

    多并行度

    image

    总结

    Flink如何处理乱序?

    watermark+window机制。window中可以对input进行按照Event Time排序,使得完全按照Event Time发生的顺序去处理数据,以达到处理乱序数据的目的。

    Flink何时触发window?

    对于late element太多的数据而言

    1. Event Time < watermark时间

    对于out-of-order以及正常的数据而言

    1. watermark时间 >= window_end_time
    2. 在[window_start_time,window_end_time)中有数据存在

    Flink应该如何设置最大乱序时间?

    结合自己的业务以及数据情况去设置。

    image

    参考

    Flink WaterMark(水位线)分布式执行理解
    Flink流计算编程--watermark(水位线)简介
    The Dataflow Model

    相关文章

      网友评论

        本文标题:Flink WaterMark 详解

        本文链接:https://www.haomeiwen.com/subject/eaegbqtx.html