美文网首页
flink乱序延时问题

flink乱序延时问题

作者: 程序男保姆 | 来源:发表于2020-07-28 13:52 被阅读0次

flink对数据延时问题默认有三种处理方式

watermarkTime 当前水位线时间
timestamp 当前对象时间
window.startTime 窗口开始时间
window.endTime 窗口结束时间

  1. 处理乱序问题

watermarkTime < window.endTime+2(允许乱序时间) && window.startTime<= timestamp < window.endTime

// 添加水位线
.assignTimestampsAndWatermarks(new WordPeriodicWatermark())


public class WordPeriodicWatermark implements AssignerWithPeriodicWatermarks<Word> {

    private long currentTimestamp = Long.MIN_VALUE;

    private static String sdf = "yyyy-MM-dd HH:mm:ss";

    @Override
    public long extractTimestamp(Word word, long previousElementTimestamp) {

        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(sdf);


        long timestamp = word.getTimestamp();
        currentTimestamp = currentTimestamp > word.getTimestamp() * 1000 ? currentTimestamp : word.getTimestamp() * 1000;
        System.out.println("event " +
                "timestamp = {" + word.getTimestamp() + "}, {" + simpleDateFormat.format(new Date(timestamp * 1000)) + "}, " +
                "CurrentWatermark = {" + getCurrentWatermark().getTimestamp() + "}, {" + simpleDateFormat.format(new Date(currentTimestamp)) + "}");

       // 这里特别注意下 timestamp 是 
      //当前对象的时间毫秒值 
      //当前对象的时间毫秒值 
      //当前对象的时间毫秒值
        return timestamp * 1000;
    }

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
        long maxTimeLag = 2000;
        long lastEmittedWatermark = currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag;

        return new Watermark(lastEmittedWatermark);
    }
}
  1. 处理延时问题

watermarkTime < window.endTime+2(允许乱序时间)+3(允许延迟时间)
&& window.startTime <= timestamp < window.endTime

.allowedLateness(Time.seconds(3))
  1. 超过延时时常问题

watermarkTime >= window.endTime+2(允许乱序时间)+3(允许延迟时间) && window.startTime <= timestamp < window.endTime

OutputTag<Word> lateDataTag = new OutputTag<Word>("late") {
        };
                .sideOutputLateData(lateDataTag)

        sum.getSideOutput(lateDataTag).print();

package com.demo.datastream.window;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.stream.StreamSupport;

public class WindowAllLaterDataDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        OutputTag<Tuple3<String, Long, Integer>> outputTag = new OutputTag<Tuple3<String, Long, Integer>>("late") {
        };

        DataStreamSource<String> streamSource1 = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple3<Long, Long, Integer>> apply = streamSource1
                .map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
                    @Override
                    public Tuple3<String, Long, Integer> map(String value) throws Exception {
                        String[] split = value.split(",");
                        return new Tuple3<>(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
                    }
                })
                // 设置处理事件为事件时间必须指定时间与水位线
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>>() {
                    private long currentTimestamp = Long.MIN_VALUE;

                    private String sdf = "yyyy-MM-dd HH:mm:ss";

                    @Override
                    public long extractTimestamp(Tuple3<String, Long, Integer> word, long previousElementTimestamp) {

                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat(sdf);

                        long timestamp = word.f1;
                        currentTimestamp = currentTimestamp > timestamp ? currentTimestamp : timestamp;
                        System.out.println("event " +
                                "timestamp = {" + timestamp + "}, {" + simpleDateFormat.format(new Date(timestamp)) + "}, " +
                                "CurrentWatermark = {" + getCurrentWatermark().getTimestamp() + "}, {" + simpleDateFormat.format(new Date(currentTimestamp)) + "}");

                        // 这里特别注意下 timestamp 是
                        //当前对象的时间毫秒值
                        //当前对象的时间毫秒值
                        //当前对象的时间毫秒值
                        return timestamp;
                    }

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        long maxTimeLag = 0;
                        long lastEmittedWatermark = currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag;
                        return new Watermark(lastEmittedWatermark);
                    }
                })
                // 设置窗口为事件时间翻滚
                .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(3))
                .sideOutputLateData(outputTag)
                .apply(new AllWindowFunction<Tuple3<String, Long, Integer>, Tuple3<Long, Long, Integer>, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow
                                              window, Iterable<Tuple3<String, Long, Integer>> values, Collector<Tuple3<Long, Long, Integer>> out) throws
                            Exception {
                        int sum = StreamSupport.stream(values.spliterator(), false).mapToInt(o -> {
                            System.out.println("apply = " + o.toString());
                            return o.f2;
                        }).sum();
                        long start = window.getStart();
                        long end = window.getEnd();
                        out.collect(new Tuple3<>(start, end, sum));
                    }
                });
        apply.print();

        apply.getSideOutput(outputTag).print();

        JobExecutionResult demo = env.execute("demo");

    }
}

相关文章

网友评论

      本文标题:flink乱序延时问题

      本文链接:https://www.haomeiwen.com/subject/frsfrktx.html