美文网首页
Time & Watermark

Time & Watermark

作者: ZYvette | 来源:发表于2021-04-25 14:34 被阅读0次

    Time

    Event Time、Ingestion Time、Processing Time

    Event-Time 表示事件发生的时间,Processing-Time 则表示处理消息的时间(墙上时间),Ingestion-Time 表示进入到系统的时间。

    在 Flink 中我们可以通过下面的方式进行 Time 类型的设置

    // 设置使用ProcessingTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); 
    

    watermark

    image.png
    • watermark是指定时生成一个时间戳,用于标杆当前数据是否延迟,任务watermark的时间点任务之后的数据都是晚到的。
    • 因为数据会有延迟,watermark不能完全解决延迟问题,所以实际中可以设置允许延迟,并触发延迟数据处理。
    • watermark是用于处理EventTime的数据。

    watermark生成

    watermark strategies

    new WatermarkStrategy<String>() {
                @Override
                public WatermarkGenerator<String> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                   return new WatermarkGenerator<String>() {
                       @Override
                       public void onEvent(String s, long l, WatermarkOutput watermarkOutput) {
                       }// 处理事件数据
                       @Override
                       public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
                       }//周期性更新watermark
                   };
                }
    
                @Override
                public TimestampAssigner<String> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                    return new TimestampAssigner<String>() {
                        @Override
                        public long extractTimestamp(String s, long l) {
                            return 0;
                        }
                    };
                }
            }
    

    WatermarGenerator种类:

    • Periodic WatermarkGenerator:周期性,一般onEvent和onPeriodicEmit都实现。
    public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
    
        private final long maxOutOfOrderness = 3500; // 3.5 seconds
    
        private long currentMaxTimestamp;
    
        @Override
        public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
            currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
        }
    
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // emit the watermark as current highest timestamp minus the out-of-orderness bound
            output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
        }
    
    }
    
    /**
     * This generator generates watermarks that are lagging behind processing time by a fixed amount.
     * It assumes that elements arrive in Flink after a bounded delay.
     */
    public class TimeLagWatermarkGenerator implements WatermarkGenerator<MyEvent> {
    
        private final long maxTimeLag = 5000; // 5 seconds
    
        @Override
        public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
            // don't need to do anything because we work on processing time
        }
    
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));
        }
    }
    
    • Punctuated WatermarkGenerator:标记性,只实现onEvent
    public class PunctuatedAssigner implements WatermarkGenerator<MyEvent> {
    
        @Override
        public void onEvent(MyEvent event, long eventTimestamp, WatermarkOutput output) {
            if (event.hasWatermarkMarker()) {
                output.emitWatermark(new Watermark(event.getWatermarkTimestamp()));
            }
        }
    
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // don't need to do anything because we emit in reaction to events above
        }
    }
    

    空闲数据源

    • 处理Idle Sources
      当等待一段时间后仍没有数据,就认为是空闲状态,下游不再等待watermark。这样能避免数据倾斜问题。
    • forBoundedOutOfOrderness
      固定延迟生成水印
    WatermarkStrategy
            .<Tuple2<Long, String>>forBoundedOutOfOrderness(Duration.ofSeconds(20))
            .withIdleness(Duration.ofMinutes(1));
    

    watermark合并

    • wattermark与kafka
      kafka一般有多个partition,flink会为每个partition生成watermark,取watermark最小值。

    • 算子watermark合并方式

    当前算子在下发前会对其进行触发的时间完全进行处理,算子当前的watermark会取其两个输入的最小值。

    参考:https://zhuanlan.zhihu.com/p/158951593
    https://cloud.tencent.com/developer/article/1629585

    相关文章

      网友评论

          本文标题:Time & Watermark

          本文链接:https://www.haomeiwen.com/subject/cubcrltx.html