flink对数据延时问题默认有三种处理方式
watermarkTime 当前水位线时间
timestamp 当前对象时间
window.startTime 窗口开始时间
window.endTime 窗口结束时间
- 处理乱序问题
watermarkTime < window.endTime+2(允许乱序时间) && window.startTime<= timestamp < window.endTime
// 添加水位线
.assignTimestampsAndWatermarks(new WordPeriodicWatermark())
public class WordPeriodicWatermark implements AssignerWithPeriodicWatermarks<Word> {
private long currentTimestamp = Long.MIN_VALUE;
private static String sdf = "yyyy-MM-dd HH:mm:ss";
@Override
public long extractTimestamp(Word word, long previousElementTimestamp) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(sdf);
long timestamp = word.getTimestamp();
currentTimestamp = currentTimestamp > word.getTimestamp() * 1000 ? currentTimestamp : word.getTimestamp() * 1000;
System.out.println("event " +
"timestamp = {" + word.getTimestamp() + "}, {" + simpleDateFormat.format(new Date(timestamp * 1000)) + "}, " +
"CurrentWatermark = {" + getCurrentWatermark().getTimestamp() + "}, {" + simpleDateFormat.format(new Date(currentTimestamp)) + "}");
// 这里特别注意下 timestamp 是
//当前对象的时间毫秒值
//当前对象的时间毫秒值
//当前对象的时间毫秒值
return timestamp * 1000;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
long maxTimeLag = 2000;
long lastEmittedWatermark = currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag;
return new Watermark(lastEmittedWatermark);
}
}
- 处理延时问题
watermarkTime < window.endTime+2(允许乱序时间)+3(允许延迟时间)
&& window.startTime <= timestamp < window.endTime
.allowedLateness(Time.seconds(3))
- 超过延时时常问题
watermarkTime >= window.endTime+2(允许乱序时间)+3(允许延迟时间) && window.startTime <= timestamp < window.endTime
OutputTag<Word> lateDataTag = new OutputTag<Word>("late") {
};
.sideOutputLateData(lateDataTag)
sum.getSideOutput(lateDataTag).print();
package com.demo.datastream.window;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.stream.StreamSupport;
public class WindowAllLaterDataDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
OutputTag<Tuple3<String, Long, Integer>> outputTag = new OutputTag<Tuple3<String, Long, Integer>>("late") {
};
DataStreamSource<String> streamSource1 = env.socketTextStream("localhost", 8888);
SingleOutputStreamOperator<Tuple3<Long, Long, Integer>> apply = streamSource1
.map(new MapFunction<String, Tuple3<String, Long, Integer>>() {
@Override
public Tuple3<String, Long, Integer> map(String value) throws Exception {
String[] split = value.split(",");
return new Tuple3<>(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
}
})
// 设置处理事件为事件时间必须指定时间与水位线
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>>() {
private long currentTimestamp = Long.MIN_VALUE;
private String sdf = "yyyy-MM-dd HH:mm:ss";
@Override
public long extractTimestamp(Tuple3<String, Long, Integer> word, long previousElementTimestamp) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(sdf);
long timestamp = word.f1;
currentTimestamp = currentTimestamp > timestamp ? currentTimestamp : timestamp;
System.out.println("event " +
"timestamp = {" + timestamp + "}, {" + simpleDateFormat.format(new Date(timestamp)) + "}, " +
"CurrentWatermark = {" + getCurrentWatermark().getTimestamp() + "}, {" + simpleDateFormat.format(new Date(currentTimestamp)) + "}");
// 这里特别注意下 timestamp 是
//当前对象的时间毫秒值
//当前对象的时间毫秒值
//当前对象的时间毫秒值
return timestamp;
}
@Nullable
@Override
public Watermark getCurrentWatermark() {
long maxTimeLag = 0;
long lastEmittedWatermark = currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - maxTimeLag;
return new Watermark(lastEmittedWatermark);
}
})
// 设置窗口为事件时间翻滚
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
.sideOutputLateData(outputTag)
.apply(new AllWindowFunction<Tuple3<String, Long, Integer>, Tuple3<Long, Long, Integer>, TimeWindow>() {
@Override
public void apply(TimeWindow
window, Iterable<Tuple3<String, Long, Integer>> values, Collector<Tuple3<Long, Long, Integer>> out) throws
Exception {
int sum = StreamSupport.stream(values.spliterator(), false).mapToInt(o -> {
System.out.println("apply = " + o.toString());
return o.f2;
}).sum();
long start = window.getStart();
long end = window.getEnd();
out.collect(new Tuple3<>(start, end, sum));
}
});
apply.print();
apply.getSideOutput(outputTag).print();
JobExecutionResult demo = env.execute("demo");
}
}
网友评论