美文网首页算法小白菜spark
Flink--WaterMark理解和实践

Flink--WaterMark理解和实践

作者: 李小李的路 | 来源:发表于2019-08-06 18:20 被阅读16次
    • 基于flink-1.8.1

    Watermark作用

    • watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
    • 我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
    • 但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
    • watermark基础知识:Flink--EventTime中WaterMark知识点扫盲

    Window的划分

    • Window的设定无关数据本身,而是系统定义好的。
    • window是flink中划分数据一个基本单位,window的划分方式是固定的,默认会根据自然时间划分window,并且划分方式是前闭后开。
    • window示例:
    window划分 第一个 第二个 第三个
    3s [00:00:00~00:00:03) [00:00:03~00:00:06) [00:00:06~00:00:09)
    5s [00:00:00~00:00:05) [00:00:05~00:00:10) [00:00:10~00:00:15)
    10s [00:00:00~00:00:10) [00:00:10~00:00:20) [00:00:20~00:00:30)
    1min [00:00:00~00:01:00) [00:01:00~00:02:00) [00:02:00~00:03:00)

    Watermark分配方式

    Watermark默认更新时间

    • 详见源码解释
    • 在非processing time的模式下,默认是200ms;
    // --------------------------------------------------------------------------------------------
        //  Time characteristic
        // --------------------------------------------------------------------------------------------
    
        /**
         * Sets the time characteristic for all streams create from this environment, e.g., processing
         * time, event time, or ingestion time.
         *
         * <p>If you set the characteristic to IngestionTime of EventTime this will set a default
         * watermark update interval of 200 ms. If this is not applicable for your application
         * you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
         *
         * @param characteristic The time characteristic.
         */
        @PublicEvolving
        public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
            this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
            if (characteristic == TimeCharacteristic.ProcessingTime) {
                getConfig().setAutoWatermarkInterval(0);
            } else {
                getConfig().setAutoWatermarkInterval(200);
            }
        }
    

    Periodic Watermarks跟踪

    • 因为Periodic Watermarks允许设定一个最大乱序时间,这种情况应用最多。
    package github.yahuili1128.watermark;
    
    import github.yahuili1128.pojo.MockUpModel;
    import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    
    import javax.annotation.Nullable;
    import java.text.SimpleDateFormat;
    import java.util.Iterator;
    import java.util.TreeSet;
    
    import static github.yahuili1128.connector.SourceKafka010.getMockUpkafka010;
    
    /**
     * @Description : 从kafka中读取数据,练习watermark
     * @Author : LiYahui
     * @Date : 2019-08-06 11:45
     * @Version : V1.0
     */
    public class PeriodicWatermarkTest {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            SingleOutputStreamOperator<MockUpModel> mockUpkafka010 = getMockUpkafka010(env).name("kafka source");
            SingleOutputStreamOperator<String> result = mockUpkafka010.filter(line -> line.gender.equals("male"))
                    .assignTimestampsAndWatermarks(new GetWateramrk()).keyBy(line -> line.gender)
                    .window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new WindowTest()).name("watermark print");
    
            result.print();
    
            env.execute(PeriodicWatermarkTest.class.getSimpleName());
        }
    
        public static class WindowTest implements WindowFunction<MockUpModel, String, String, TimeWindow> {
    
            @Override
            public void apply(String key, TimeWindow window, Iterable<MockUpModel> input, Collector<String> out)
                    throws Exception {
                TreeSet<Long> set = new TreeSet<>();
                //          元素个数
                int size = Iterables.size(input);
                Iterator<MockUpModel> eles = input.iterator();
                while (eles.hasNext()) {
                    set.add(eles.next().timestamp);
                }
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                //(code,窗口内元素个数,窗口内最早元素的时间,窗口内最晚元素的时间,窗口自身开始时间,窗口自身结束时间)
                String first = sdf.format(set.first());
                String last = sdf.format(set.last());
                String start = sdf.format(window.getStart());
                String end = sdf.format(window.getEnd());
                // 调试使用
                out.collect("event.key:" + key + ",window中元素个数:" + size + ",window第一个元素时间戳:" + first + ",window最后一个元素时间戳:"
                        + last + ",window开始时间戳:" + start + ",window结束时间戳:" + end + ",窗口内所有的时间戳:" + set.toString());
    
            }
        }
    
    
        public static class GetWateramrk implements AssignerWithPeriodicWatermarks<MockUpModel> {
            //      定义最大延迟 2s
            private final long maxOutOfOrderness = 5000L;
            private long currentMaxTimestamp;
            private Watermark watermark;
    
            //      将时间戳信息格式化,调试学习
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    
            @Nullable
            @Override
            public Watermark getCurrentWatermark() {
                watermark = new Watermark(this.currentMaxTimestamp - this.maxOutOfOrderness);
                return watermark;
            }
    
            @Override
            public long extractTimestamp(MockUpModel element, long previousElementTimestamp) {
                //      获取event中的时间戳
                long timestamp = element.getTimestamp();
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                //          将所有的时间信息打印
                System.out.println("--->>> event.key:" + element.gender + " | event中timestamp:" + timestamp + "| " + sdf
                        .format(timestamp) + "| currentMaxTimestamp:" + currentMaxTimestamp + "| " + sdf
                        .format(currentMaxTimestamp) + "| watermark" + watermark.toString());
                //          返回event中的时间戳
                return timestamp;
            }
        }
    
    }
    
    
    • print数据案例
    --->>> event.key:male | event中timestamp:1565082033747| 2019-08-06 17:00:33.747| currentMaxTimestamp:1565082033747| 2019-08-06 17:00:33.747| watermarkWatermark @ -5000
    --->>> event.key:male | event中timestamp:1565082036758| 2019-08-06 17:00:36.758| currentMaxTimestamp:1565082036758| 2019-08-06 17:00:36.758| watermarkWatermark @ -5000
    --->>> event.key:male | event中timestamp:1565082038778| 2019-08-06 17:00:38.778| currentMaxTimestamp:1565082038778| 2019-08-06 17:00:38.778| watermarkWatermark @ -5000
    --->>> event.key:male | event中timestamp:1565082037791| 2019-08-06 17:00:37.791| currentMaxTimestamp:1565082038778| 2019-08-06 17:00:38.778| watermarkWatermark @ -5000
    --->>> event.key:male | event中timestamp:1565082038803| 2019-08-06 17:00:38.803| currentMaxTimestamp:1565082038803| 2019-08-06 17:00:38.803| watermarkWatermark @ -5000
    --->>> event.key:male | event中timestamp:1565082039815| 2019-08-06 17:00:39.815| currentMaxTimestamp:1565082039815| 2019-08-06 17:00:39.815| watermarkWatermark @ -5000
    --->>> event.key:male | event中timestamp:1565082041839| 2019-08-06 17:00:41.839| currentMaxTimestamp:1565082041839| 2019-08-06 17:00:41.839| watermarkWatermark @ -5000
    --->>> event.key:male | event中timestamp:1565082044850| 2019-08-06 17:00:44.850| currentMaxTimestamp:1565082044850| 2019-08-06 17:00:44.850| watermarkWatermark @ -5000
    --->>> event.key:male | event中timestamp:1565082046869| 2019-08-06 17:00:46.869| currentMaxTimestamp:1565082046869| 2019-08-06 17:00:46.869| watermarkWatermark @ 1565082039850
    8> event.key:male,window中元素个数:6,window第一个元素时间戳:2019-08-06 17:00:33.747,window最后一个元素时间戳:2019-08-06 17:00:39.815,window开始时间戳:2019-08-06 17:00:30.000,window结束时间戳:2019-08-06 17:00:40.000,窗口内所有的时间戳:[1565082033747, 1565082036758, 1565082037791, 1565082038778, 1565082038803, 1565082039815]
    --->>> event.key:male | event中timestamp:1565082047880| 2019-08-06 17:00:47.880| currentMaxTimestamp:1565082047880| 2019-08-06 17:00:47.880| watermarkWatermark @ 1565082041869
    --->>> event.key:male | event中timestamp:1565082046890| 2019-08-06 17:00:46.890| currentMaxTimestamp:1565082047880| 2019-08-06 17:00:47.880| watermarkWatermark @ 1565082042880
    --->>> event.key:male | event中timestamp:1565082049900| 2019-08-06 17:00:49.900| currentMaxTimestamp:1565082049900| 2019-08-06 17:00:49.900| watermarkWatermark @ 1565082042880
    --->>> event.key:male | event中timestamp:1565082051921| 2019-08-06 17:00:51.921| currentMaxTimestamp:1565082051921| 2019-08-06 17:00:51.921| watermarkWatermark @ 1565082044900
    --->>> event.key:male | event中timestamp:1565082050934| 2019-08-06 17:00:50.934| currentMaxTimestamp:1565082051921| 2019-08-06 17:00:51.921| watermarkWatermark @ 1565082046921
    --->>> event.key:male | event中timestamp:1565082051945| 2019-08-06 17:00:51.945| currentMaxTimestamp:1565082051945| 2019-08-06 17:00:51.945| watermarkWatermark @ 1565082046921
    --->>> event.key:male | event中timestamp:1565082052955| 2019-08-06 17:00:52.955| currentMaxTimestamp:1565082052955| 2019-08-06 17:00:52.955| watermarkWatermark @ 1565082046945
    --->>> event.key:male | event中timestamp:1565082055965| 2019-08-06 17:00:55.965| currentMaxTimestamp:1565082055965| 2019-08-06 17:00:55.965| watermarkWatermark @ 1565082047955
    8> event.key:male,window中元素个数:6,window第一个元素时间戳:2019-08-06 17:00:41.839,window最后一个元素时间戳:2019-08-06 17:00:49.900,window开始时间戳:2019-08-06 17:00:40.000,window结束时间戳:2019-08-06 17:00:50.000,窗口内所有的时间戳:[1565082041839, 1565082044850, 1565082046869, 1565082046890, 1565082047880, 1565082049900]
    --->>> event.key:male | event中timestamp:1565082056983| 2019-08-06 17:00:56.983| currentMaxTimestamp:1565082056983| 2019-08-06 17:00:56.983| watermarkWatermark @ 1565082050965
    --->>> event.key:male | event中timestamp:1565082055993| 2019-08-06 17:00:55.993| currentMaxTimestamp:1565082056983| 2019-08-06 17:00:56.983| watermarkWatermark @ 1565082051983
    --->>> event.key:male | event中timestamp:1565082059005| 2019-08-06 17:00:59.005| currentMaxTimestamp:1565082059005| 2019-08-06 17:00:59.005| watermarkWatermark @ 1565082051983
    --->>> event.key:male | event中timestamp:1565082061028| 2019-08-06 17:01:01.028| currentMaxTimestamp:1565082061028| 2019-08-06 17:01:01.028| watermarkWatermark @ 1565082054005
    --->>> event.key:male | event中timestamp:1565082062036| 2019-08-06 17:01:02.036| currentMaxTimestamp:1565082062036| 2019-08-06 17:01:02.036| watermarkWatermark @ 1565082056028
    
    • 为什么watermark会出现-5000
      • AssignerWithPeriodicWatermarks子类是每隔一段时间执行的,这个具体由ExecutionConfig.setAutoWatermarkInterval设置,默认是200ms,之所以会出现-5000时因为你没有数据进入窗口,当然一直都是-5000,但是getCurrentWatermark方法不是在执行extractTimestamp后才执行。

    结论

    • window的触发要符合以下几个条件:
      • watermark时间 >= window_end_time
      • 在[window_start_time,window_end_time)中有数据存在;
    • 同时满足了以上2个条件,window才会触发。
    • watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加.
    • 这个部分的知识点需要细细的理解一下;

    相关文章

      网友评论

        本文标题:Flink--WaterMark理解和实践

        本文链接:https://www.haomeiwen.com/subject/naavdctx.html