- 所以设置事件时间时,开窗会使用你extract的这个时间戳
DataStream<SensorReading> dataStream = inputStream.map(line -> {
String[] fields = line.split(",");
return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
}).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {
private long maxOutofOrderness = 5 * 1000;
private long currentMaxTs = Long.MIN_VALUE;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTs - maxOutofOrderness);
}
@Override
public long extractTimestamp(SensorReading sensorReading, long l) {
//获取当前记录的时间戳
long currentTs = sensorReading.getTimestamp();
// 更新最大的时间戳
currentMaxTs = Math.max(currentMaxTs, currentTs);
// 返回记录的时间戳
return currentTs;
}
});
- assignTimestampsAndWatermarks里面传了一个AssignerWithPeriodicWatermarks的匿名类,也可以当是传了一个对象进去。
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
int inputParallelism = this.getTransformation().getParallelism();
AssignerWithPeriodicWatermarks<T> cleanedAssigner = (AssignerWithPeriodicWatermarks)this.clean(timestampAndWatermarkAssigner);
TimestampsAndPeriodicWatermarksOperator<T> operator = new TimestampsAndPeriodicWatermarksOperator(cleanedAssigner);
return this.transform("Timestamps/Watermarks", this.getTransformation().getOutputType(), operator).setParallelism(inputParallelism);
}
- TimestampsAndPeriodicWatermarksOperator这个类里调用了对象的extractTimestamp
public void processElement(StreamRecord<T> element) throws Exception {
long newTimestamp = ((AssignerWithPeriodicWatermarks)this.userFunction).extractTimestamp(element.getValue(), element.hasTimestamp() ? element.getTimestamp() : -9223372036854775808L);
this.output.collect(element.replace(element.getValue(), newTimestamp));
}
- 里面用到的StreamRecord包装了传递的数据
public final class StreamRecord<T> extends StreamElement {
private T value;
private long timestamp;
private boolean hasTimestamp;
public StreamRecord(T value) {
this.value = value;
}
public StreamRecord(T value, long timestamp) {
this.value = value;
this.timestamp = timestamp;
this.hasTimestamp = true;
}
... ...
}
网友评论