时间语义
flink 有三种时间:EventTime,表示数据最初的触发时间;IngestionTime,数据进入Flink的时间,是DataSource拿到数据的时间;ProcessingTime,执行操作算子的本地系统时间,与机器相关。
flink1.12 中默认时间语义是 EventTime,在实际处理数据,大多也都是以 EventTime 为主。因为数据可能受于网络的影响,或其他因素导致乱序数据的产生。
Watermark
那对于乱序数据我们就可以使用 Watermark,我们来举个例子说明 flink 中的 Watermark 是什么:比如定点9点上车,但是往往有人9.01才来,那 Watermark 的做法就是把自己的时间调慢,也就是8.59分,9.01的人来了 对于我来说还是9点。如果有人9.02来了,那其实可以配合 window 的延迟,我先输出数据开车,你来弯道超车,然后我重新计算输出新数据,但非有人,9.03来呢,那我不管你了,我开车上高速,你可以坐下一辆车(侧输出流)。
Watermark 的时间不宜设置太大,因为拿到数据时间可能是准确的,但是拿到数据就会很慢。相当于9点发车,但我想等人到齐(数据),我设置到了 8.30,我需要在等 30分钟才能发车,此时我到站 比别人还晚30分钟。
以下演示 Watermark 时间乱序处理,和配合window的函数处理
Watermark 在分区中会计算最小事件时钟,保证下游数据收到每个事件时钟,并根据自己的时钟得出是否计算结果并关闭窗口。Watermark 是以广播的形式把事件时钟传递给每个下游。
watermark 的特性主要有以下几点:
- 数据流中的Watermark用于表示timestamp小于Watermark的数据都已经到达,因此Window的执行是由Watermark触发的
- Watermark是一条特殊的数据记录
- Watermark必须单调递增,以确保任务的事件时间时钟向前推进,不可逆
代码演示
public class EventData{
private Integer id;
private Long eventTime;
private String data;
private Integer num;
public EventData(){
};
public Integer getNum() {
return num;
}
public void setNum(Integer num) {
this.num = num;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public Long getEventTime() {
return eventTime;
}
public void setEventTime(Long eventTime) {
this.eventTime = eventTime;
}
public String getData() {
return data;
}
public void setData(String data) {
this.data = data;
}
@Override
public String toString() {
return "EventData{" +
"id=" + id +
", eventTime=" + eventTime +
", data='" + data + '\'' +
", num=" + num +
'}';
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> dataStream = env.socketTextStream("192.168.200.58", 7777);
// 数据转换
DataStream<EventData> stream = dataStream.map(new MapFunction<String, EventData>() {
@Override
public EventData map(String value) throws Exception {
String[] strs = value.split(",");
EventData eventData = new EventData();
eventData.setId(Integer.valueOf(strs[0]));
eventData.setEventTime(Long.valueOf(strs[1]));
eventData.setData(strs[2]);
eventData.setNum(Integer.valueOf(strs[3]));
return eventData;
}
}).assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<EventData>(Time.seconds(5)) {
//提取时间戳
@Override
public long extractTimestamp(EventData element) {
return element.getEventTime() * 1000L;
}
}
);
stream.print("WM: ");
// 基于事件时间的开窗聚合,统计15秒内数据的最小ID值
stream.keyBy(new KeySelector<EventData, Object>() {
@Override
public Object getKey(EventData value) throws Exception {
return value.getData();
}
})
.timeWindow(Time.seconds(15))
// .window(TumblingEventTimeWindows.of(Time.seconds(15)))
.sum("num")
.print("result: ");
env.execute("test");
}
setAutoWatermarkInterval 是设置 Watermark 的生成时间,默认是 0,也就是来一个数据我对这个数据生成一个 Watermark时钟,用于去比较窗口函数的时间来触发计算。可以手动设置,在庞大的数据量,每生成一个 Watermark 有些费性能。以下为隔100ms生成一次。
env.getConfig().setAutoWatermarkInterval(100);
测试数据如下:
6,1623051400,test data,1
6,1623051401,test data,1
6,1623051402,test data,1
6,1623051405,test data,3
6,1623051406,test data,3
6,1623051409,test data,3
6,1623051410,test data,5
你会发现当你的 timestamp 为 1623051410,会触发窗口计算,但输出的num=3,因为 Watermark 创建窗口是会自动创建,会根据你的第一个数据的 timestamp 以及 timeWindow 窗口的值计算窗口的 startTime,计算方式
# startTime
timestamp - (timestamp - offset + windowSize) % windowSize;
# endTime
startTime + windowSize
其中默认情况 offset = 0,根据以上计算得出如下
startTime timestamp
1623051390000 1623051400000
1623051390000 1623051401000
1623051390000 1623051402000
1623051405000 1623051405000
1623051405000 1623051406000
1623051405000 1623051409000
1623051405000 1623051410000
会看到 400000(timestamp) 创建的 startTime 是 390000,405000(timestamp) 创建的 startTime 是 405000,他们被分为了两个窗口。而他们的 405000-390000=15000=15s,刚好就是我们的 windowSize,也就是说 405000 的触发点在 420000 ,但我们还给 Watermark 延迟了 5s,也就是说正确的关闭第二个窗口的 timestamp = 1623051425000,得出结论
Watermark1=[390,405) Watermark2=[405,420)
以上结论并行度为1
网友评论