Watermark作用
- watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
- 我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
- 但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
- watermark基础知识:Flink--EventTime中WaterMark知识点扫盲
Window的划分
- Window的设定无关数据本身,而是系统定义好的。
- window是flink中划分数据一个基本单位,window的划分方式是固定的,默认会根据自然时间划分window,并且划分方式是前闭后开。
- window示例:
window划分 |
第一个 |
第二个 |
第三个 |
3s |
[00:00:00~00:00:03) |
[00:00:03~00:00:06) |
[00:00:06~00:00:09) |
5s |
[00:00:00~00:00:05) |
[00:00:05~00:00:10) |
[00:00:10~00:00:15) |
10s |
[00:00:00~00:00:10) |
[00:00:10~00:00:20) |
[00:00:20~00:00:30) |
1min |
[00:00:00~00:01:00) |
[00:01:00~00:02:00) |
[00:02:00~00:03:00) |
Watermark分配方式
Watermark默认更新时间
- 详见源码解释
- 在非processing time的模式下,默认是200ms;
// --------------------------------------------------------------------------------------------
// Time characteristic
// --------------------------------------------------------------------------------------------
/**
* Sets the time characteristic for all streams create from this environment, e.g., processing
* time, event time, or ingestion time.
*
* <p>If you set the characteristic to IngestionTime of EventTime this will set a default
* watermark update interval of 200 ms. If this is not applicable for your application
* you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
*
* @param characteristic The time characteristic.
*/
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}
Periodic Watermarks跟踪
- 因为Periodic Watermarks允许设定一个最大乱序时间,这种情况应用最多。
package github.yahuili1128.watermark;
import github.yahuili1128.pojo.MockUpModel;
import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.Iterator;
import java.util.TreeSet;
import static github.yahuili1128.connector.SourceKafka010.getMockUpkafka010;
/**
* @Description : 从kafka中读取数据,练习watermark
* @Author : LiYahui
* @Date : 2019-08-06 11:45
* @Version : V1.0
*/
public class PeriodicWatermarkTest {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
SingleOutputStreamOperator<MockUpModel> mockUpkafka010 = getMockUpkafka010(env).name("kafka source");
SingleOutputStreamOperator<String> result = mockUpkafka010.filter(line -> line.gender.equals("male"))
.assignTimestampsAndWatermarks(new GetWateramrk()).keyBy(line -> line.gender)
.window(TumblingEventTimeWindows.of(Time.seconds(10))).apply(new WindowTest()).name("watermark print");
result.print();
env.execute(PeriodicWatermarkTest.class.getSimpleName());
}
public static class WindowTest implements WindowFunction<MockUpModel, String, String, TimeWindow> {
@Override
public void apply(String key, TimeWindow window, Iterable<MockUpModel> input, Collector<String> out)
throws Exception {
TreeSet<Long> set = new TreeSet<>();
// 元素个数
int size = Iterables.size(input);
Iterator<MockUpModel> eles = input.iterator();
while (eles.hasNext()) {
set.add(eles.next().timestamp);
}
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
//(code,窗口内元素个数,窗口内最早元素的时间,窗口内最晚元素的时间,窗口自身开始时间,窗口自身结束时间)
String first = sdf.format(set.first());
String last = sdf.format(set.last());
String start = sdf.format(window.getStart());
String end = sdf.format(window.getEnd());
// 调试使用
out.collect("event.key:" + key + ",window中元素个数:" + size + ",window第一个元素时间戳:" + first + ",window最后一个元素时间戳:"
+ last + ",window开始时间戳:" + start + ",window结束时间戳:" + end + ",窗口内所有的时间戳:" + set.toString());
}
}
public static class GetWateramrk implements AssignerWithPeriodicWatermarks<MockUpModel> {
// 定义最大延迟 2s
private final long maxOutOfOrderness = 5000L;
private long currentMaxTimestamp;
private Watermark watermark;
// 将时间戳信息格式化,调试学习
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@Nullable
@Override
public Watermark getCurrentWatermark() {
watermark = new Watermark(this.currentMaxTimestamp - this.maxOutOfOrderness);
return watermark;
}
@Override
public long extractTimestamp(MockUpModel element, long previousElementTimestamp) {
// 获取event中的时间戳
long timestamp = element.getTimestamp();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
// 将所有的时间信息打印
System.out.println("--->>> event.key:" + element.gender + " | event中timestamp:" + timestamp + "| " + sdf
.format(timestamp) + "| currentMaxTimestamp:" + currentMaxTimestamp + "| " + sdf
.format(currentMaxTimestamp) + "| watermark" + watermark.toString());
// 返回event中的时间戳
return timestamp;
}
}
}
--->>> event.key:male | event中timestamp:1565082033747| 2019-08-06 17:00:33.747| currentMaxTimestamp:1565082033747| 2019-08-06 17:00:33.747| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082036758| 2019-08-06 17:00:36.758| currentMaxTimestamp:1565082036758| 2019-08-06 17:00:36.758| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082038778| 2019-08-06 17:00:38.778| currentMaxTimestamp:1565082038778| 2019-08-06 17:00:38.778| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082037791| 2019-08-06 17:00:37.791| currentMaxTimestamp:1565082038778| 2019-08-06 17:00:38.778| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082038803| 2019-08-06 17:00:38.803| currentMaxTimestamp:1565082038803| 2019-08-06 17:00:38.803| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082039815| 2019-08-06 17:00:39.815| currentMaxTimestamp:1565082039815| 2019-08-06 17:00:39.815| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082041839| 2019-08-06 17:00:41.839| currentMaxTimestamp:1565082041839| 2019-08-06 17:00:41.839| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082044850| 2019-08-06 17:00:44.850| currentMaxTimestamp:1565082044850| 2019-08-06 17:00:44.850| watermarkWatermark @ -5000
--->>> event.key:male | event中timestamp:1565082046869| 2019-08-06 17:00:46.869| currentMaxTimestamp:1565082046869| 2019-08-06 17:00:46.869| watermarkWatermark @ 1565082039850
8> event.key:male,window中元素个数:6,window第一个元素时间戳:2019-08-06 17:00:33.747,window最后一个元素时间戳:2019-08-06 17:00:39.815,window开始时间戳:2019-08-06 17:00:30.000,window结束时间戳:2019-08-06 17:00:40.000,窗口内所有的时间戳:[1565082033747, 1565082036758, 1565082037791, 1565082038778, 1565082038803, 1565082039815]
--->>> event.key:male | event中timestamp:1565082047880| 2019-08-06 17:00:47.880| currentMaxTimestamp:1565082047880| 2019-08-06 17:00:47.880| watermarkWatermark @ 1565082041869
--->>> event.key:male | event中timestamp:1565082046890| 2019-08-06 17:00:46.890| currentMaxTimestamp:1565082047880| 2019-08-06 17:00:47.880| watermarkWatermark @ 1565082042880
--->>> event.key:male | event中timestamp:1565082049900| 2019-08-06 17:00:49.900| currentMaxTimestamp:1565082049900| 2019-08-06 17:00:49.900| watermarkWatermark @ 1565082042880
--->>> event.key:male | event中timestamp:1565082051921| 2019-08-06 17:00:51.921| currentMaxTimestamp:1565082051921| 2019-08-06 17:00:51.921| watermarkWatermark @ 1565082044900
--->>> event.key:male | event中timestamp:1565082050934| 2019-08-06 17:00:50.934| currentMaxTimestamp:1565082051921| 2019-08-06 17:00:51.921| watermarkWatermark @ 1565082046921
--->>> event.key:male | event中timestamp:1565082051945| 2019-08-06 17:00:51.945| currentMaxTimestamp:1565082051945| 2019-08-06 17:00:51.945| watermarkWatermark @ 1565082046921
--->>> event.key:male | event中timestamp:1565082052955| 2019-08-06 17:00:52.955| currentMaxTimestamp:1565082052955| 2019-08-06 17:00:52.955| watermarkWatermark @ 1565082046945
--->>> event.key:male | event中timestamp:1565082055965| 2019-08-06 17:00:55.965| currentMaxTimestamp:1565082055965| 2019-08-06 17:00:55.965| watermarkWatermark @ 1565082047955
8> event.key:male,window中元素个数:6,window第一个元素时间戳:2019-08-06 17:00:41.839,window最后一个元素时间戳:2019-08-06 17:00:49.900,window开始时间戳:2019-08-06 17:00:40.000,window结束时间戳:2019-08-06 17:00:50.000,窗口内所有的时间戳:[1565082041839, 1565082044850, 1565082046869, 1565082046890, 1565082047880, 1565082049900]
--->>> event.key:male | event中timestamp:1565082056983| 2019-08-06 17:00:56.983| currentMaxTimestamp:1565082056983| 2019-08-06 17:00:56.983| watermarkWatermark @ 1565082050965
--->>> event.key:male | event中timestamp:1565082055993| 2019-08-06 17:00:55.993| currentMaxTimestamp:1565082056983| 2019-08-06 17:00:56.983| watermarkWatermark @ 1565082051983
--->>> event.key:male | event中timestamp:1565082059005| 2019-08-06 17:00:59.005| currentMaxTimestamp:1565082059005| 2019-08-06 17:00:59.005| watermarkWatermark @ 1565082051983
--->>> event.key:male | event中timestamp:1565082061028| 2019-08-06 17:01:01.028| currentMaxTimestamp:1565082061028| 2019-08-06 17:01:01.028| watermarkWatermark @ 1565082054005
--->>> event.key:male | event中timestamp:1565082062036| 2019-08-06 17:01:02.036| currentMaxTimestamp:1565082062036| 2019-08-06 17:01:02.036| watermarkWatermark @ 1565082056028
- 为什么watermark会出现-5000
- AssignerWithPeriodicWatermarks子类是每隔一段时间执行的,这个具体由ExecutionConfig.setAutoWatermarkInterval设置,默认是200ms,之所以会出现-5000时因为你没有数据进入窗口,当然一直都是-5000,但是getCurrentWatermark方法不是在执行extractTimestamp后才执行。
结论
- window的触发要符合以下几个条件:
- watermark时间 >= window_end_time
- 在[window_start_time,window_end_time)中有数据存在;
- 同时满足了以上2个条件,window才会触发。
- watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加.
- 这个部分的知识点需要细细的理解一下;
网友评论