美文网首页
Flink 1.2Time & WaterMark

Flink 1.2Time & WaterMark

作者: caster | 来源:发表于2021-06-13 21:19 被阅读0次

    1.Event:流处理中处理的数据代表的实际业务事件。

    2.Time:确定事件的时间概念,用于flink基于时间的操作(window等)。

    三种时间语义
    1. Processing Time
      Processing Time 是指事件被操作算子处理时机器的系统时间。不需要流和机器之间的协调,它提供了最好的性能和最低的延迟。但是,在分布式和异步的环境下,Processing Time 不能提供确定性,因为它容易受到事件到达系统的速度(例如从消息队列)、事件在系统内操作流动的速度以及中断的影响。
    2. Event Time
      Event Time 是事件发生的时间。事件(数据)自带的时间字段,与flink无关,需要程序指定如何从数据中构造time水印。此方式处理事件可以完美得到想要的结果,但是无序到达事件会导致程序延时处理,且不能无限等待迟到数据会导致结果一致性问题。正常业务需求关注的都是event time,event time会存在迟到数据的情况,需要引入Watermark迟到数据处理等概念。
    3. Ingestion Time
      Ingestion Time 是事件进入Flink Source的时间。在源操作处,每个事件将源的当前时间作为时间戳,其介于event 和 processing之间。

    1.12之后默认使用event time。

    3.Watermark(水印)

    Watermark是一种特殊的时间戳,也是一种被插入到数据流的特殊的数据(StreamElement),用于表示延迟多久计算当前窗口的聚合操作。可以防止Event乱序到达flink时,计算无限等待迟到的数据。
    延迟处理示例:

    1. [1s,5s)的window窗口,设置延迟时间为为3s,则8s的数据到达后会在stream中插入watermark5,就会触发[1s,5s)的window窗口计算;
    2. 如果还存再延迟数据,则使用allowedLateness(),每次迟到数据来了之后重新计算并输出;
    3. 如果还存在,则sideout输出后再进行后续处理。

    插入的watremark为单调递增的,与数据时间戳相关。
    数据到达后分配到对应的窗口,watermark到达后触发对应的窗口计算。
    watermark的传递:
    watermark在重分区操作后(keyby,rebanlance等)会广播到全部下游分区。
    下游分区收到上游多个分区watermark,会存储每个上游分区的watermark,以最小的watermark为准进行操作,如果最小watermark更新,则向后续下游广播。

    3.1.watermark使用示例

    处理每条数据时都使用到目前为止event time最大值,设置最大等待时间为3s,水印则为大当前最大event time-3s,window(10s)会等待3s触发上一个窗口计算:(旧版本方式)

    public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
        private final long maxOutOfOrderness = 3000; // 3.0 seconds
        private long currentMaxTimestamp;
    
        @Override
        public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
            long timestamp = element.getCreationTime();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }
        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            // 以迄今为止收到的最大时间戳来生成 watermark
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
    
    event time max event time watermark 操作
    10000 10000 7000 水印为当前最大值-3s
    11000 11000 8000
    12000 12000 9000
    18000 18000 15000
    13000 18000 15000 最大值和水印不变
    19000 19000 16000
    20000 20000 17000
    23000 23000 20000 触发窗口[10,20)计算
    25000 25000 22000 不会再触发窗口[10,20)计算

    当23s的数据到达,生成的watermark时间戳为20s,触发计算10~20s窗口,但是只会计算[10,20)之间的数据,并不会把触发计算的下一窗口数据计算进去。达到等待迟到数据3s的效果,已经触发过计算的窗口不会再次被触发。
    注:不是对eventTime要求特别严格的数据,尽量不要采用eventTime方式来处理,会有丢数据的风险。

    在新版本1.12-中,使用WatermarkStrategy工厂方式构造构造watermark和time语义

    public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T> 
    

    TimestampAssignerSupplier负责定义时间语义
    WatermarkGeneratorSupplier负责设置watermark
    watermark类型:

    旧版本设置watermark方法和类型
    间断性:每一条数据后插入一条watermark,数据稀疏
    周期性:每隔一段时间生成一条watermark,数据稠密
    watermark成生器父类如下:
    //基于event或者周期生成watermark,代替了旧版本中的:
    //AssignerWithPunctuatedWatermarks :
    //AssignerWithPeriodicWatermarks:周期性
    public interface WatermarkGenerator<T> {
        //每条数据调用一次,可以检查或者记录事件的时间戳,或者也可以基于事件数据本身去生成 watermark
        void onEvent(T event, long eventTimestamp, WatermarkOutput output);
        //周期性调用,由ExecutionConfig.setAutoWatermarkInterval()方法指定周期时间间隔。
        void onPeriodicEmit(WatermarkOutput output);
    }
    

    常用的watermark生成器:
    BoundedOutOfOrdernessWatermarks(周期性)

    //
    public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {
    
        //到目前为止最大的event time
        private long maxTimestamp;
    
        //watermark最大无序延迟时间
        private final long outOfOrdernessMillis;
    
        //通过最大无序时间界限生成watermark生成器
        public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
            checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
            checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");
    
            this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();
    
            this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
        }
        //每来一条数据更新当前最大的event time
        @Override
        public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
            maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
        }
        //周期性生成watermark到数据流中
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
        }
    }
    

    3.2.延时到达的数据处理

    由于设置watermark会等待延时数据到达的时间不能过长,当数据迟到过久到达并不会再次触发计算。则需要处理延时到达数据:

    1. 继续触发窗口计算:
      通过allowedLateness()设置,只要满足watermark < window_end_time + allowedLateness,延迟数据进入窗口就会触发窗口计算。即当前到达的数据
      的最大时间-设置最大等待时间生成的水印,没有超过当前窗口右侧值allowedLateness范围是,会重新触发计算。
    2. 重定向到其他地方:
      ds.getSideOutput()

    相关文章

      网友评论

          本文标题:Flink 1.2Time & WaterMark

          本文链接:https://www.haomeiwen.com/subject/pnqwrltx.html