美文网首页
flink乱序延时问题

flink乱序延时问题

作者: 程序男保姆 | 来源:发表于2020-07-28 13:52 被阅读0次

    flink对数据延时问题默认有三种处理方式

    watermarkTime 当前水位线时间
    timestamp 当前对象时间
    window.startTime 窗口开始时间
    window.endTime 窗口结束时间

    1. 处理乱序问题

    watermarkTime < window.endTime+2(允许乱序时间) && window.startTime<= timestamp < window.endTime

    // 添加水位线
    .assignTimestampsAndWatermarks(new WordPeriodicWatermark())
    
    
    public class WordPeriodicWatermark implements AssignerWithPeriodicWatermarks<Word> {
    
        private long currentTimestamp = Long.MIN_VALUE;
    
        private static String sdf = "yyyy-MM-dd HH:mm:ss";
    
        @Override
        public long extractTimestamp(Word word, long previousElementTimestamp) {
    
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat(sdf);
    
    
            long timestamp = word.getTimestamp();
            currentTimestamp = currentTimestamp > word.getTimestamp() * 1000 ? currentTimestamp : word.getTimestamp() * 1000;
            System.out.println("event " +
                    "timestamp = {" + word.getTimestamp() + "}, {" + simpleDateFormat.format(new Date(timestamp * 1000)) + "}, " +
                    "CurrentWatermark = {" + getCurrentWatermark().getTimestamp() + "}, {" + simpleDateFormat.format(new Date(currentTimestamp)) + "}");
    
           // 这里特别注意下 timestamp 是 
          //当前对象的时间毫秒值 
          //当前对象的时间毫秒值 
          //当前对象的时间毫秒值
            return timestamp * 1000;
        }
    
        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            long maxTimeLag = 2000;
            long lastEmittedWatermark = currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag;
    
            return new Watermark(lastEmittedWatermark);
        }
    }
    
    1. 处理延时问题

    watermarkTime < window.endTime+2(允许乱序时间)+3(允许延迟时间)
    && window.startTime <= timestamp < window.endTime

    .allowedLateness(Time.seconds(3))
    
    1. 超过延时时常问题

    watermarkTime >= window.endTime+2(允许乱序时间)+3(允许延迟时间) && window.startTime <= timestamp < window.endTime

    OutputTag<Word> lateDataTag = new OutputTag<Word>("late") {
            };
                    .sideOutputLateData(lateDataTag)
    
            sum.getSideOutput(lateDataTag).print();
    
    
    package com.demo.datastream.window;
    
    import org.apache.flink.api.common.JobExecutionResult;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    import javax.annotation.Nullable;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.stream.StreamSupport;
    
    public class WindowAllLaterDataDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    
            OutputTag<Tuple3<String, Long, Integer>> outputTag = new OutputTag<Tuple3<String, Long, Integer>>("late") {
            };
    
            DataStreamSource<String> streamSource1 = env.socketTextStream("localhost", 8888);
    
            SingleOutputStreamOperator<Tuple3<Long, Long, Integer>> apply = streamSource1
                    .map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
                        @Override
                        public Tuple3<String, Long, Integer> map(String value) throws Exception {
                            String[] split = value.split(",");
                            return new Tuple3<>(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
                        }
                    })
                    // 设置处理事件为事件时间必须指定时间与水位线
                    .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>>() {
                        private long currentTimestamp = Long.MIN_VALUE;
    
                        private String sdf = "yyyy-MM-dd HH:mm:ss";
    
                        @Override
                        public long extractTimestamp(Tuple3<String, Long, Integer> word, long previousElementTimestamp) {
    
                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat(sdf);
    
                            long timestamp = word.f1;
                            currentTimestamp = currentTimestamp > timestamp ? currentTimestamp : timestamp;
                            System.out.println("event " +
                                    "timestamp = {" + timestamp + "}, {" + simpleDateFormat.format(new Date(timestamp)) + "}, " +
                                    "CurrentWatermark = {" + getCurrentWatermark().getTimestamp() + "}, {" + simpleDateFormat.format(new Date(currentTimestamp)) + "}");
    
                            // 这里特别注意下 timestamp 是
                            //当前对象的时间毫秒值
                            //当前对象的时间毫秒值
                            //当前对象的时间毫秒值
                            return timestamp;
                        }
    
                        @Nullable
                        @Override
                        public Watermark getCurrentWatermark() {
                            long maxTimeLag = 0;
                            long lastEmittedWatermark = currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag;
                            return new Watermark(lastEmittedWatermark);
                        }
                    })
                    // 设置窗口为事件时间翻滚
                    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .allowedLateness(Time.seconds(3))
                    .sideOutputLateData(outputTag)
                    .apply(new AllWindowFunction<Tuple3<String, Long, Integer>, Tuple3<Long, Long, Integer>, TimeWindow>() {
                        @Override
                        public void apply(TimeWindow
                                                  window, Iterable<Tuple3<String, Long, Integer>> values, Collector<Tuple3<Long, Long, Integer>> out) throws
                                Exception {
                            int sum = StreamSupport.stream(values.spliterator(), false).mapToInt(o -> {
                                System.out.println("apply = " + o.toString());
                                return o.f2;
                            }).sum();
                            long start = window.getStart();
                            long end = window.getEnd();
                            out.collect(new Tuple3<>(start, end, sum));
                        }
                    });
            apply.print();
    
            apply.getSideOutput(outputTag).print();
    
            JobExecutionResult demo = env.execute("demo");
    
        }
    }
    

    相关文章

      网友评论

          本文标题:flink乱序延时问题

          本文链接:https://www.haomeiwen.com/subject/frsfrktx.html