关于Operator\Transformation:https://www.jianshu.com/p/733a8ed1a9ef
assignTimestampsAndWatermarks
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
WatermarkStrategy<T> watermarkStrategy) {
final WatermarkStrategy<T> cleanedStrategy = clean(watermarkStrategy);
// match parallelism to input, to have a 1:1 source -> timestamps/watermarks relationship
// and chain
final int inputParallelism = getTransformation().getParallelism();
final TimestampsAndWatermarksTransformation<T> transformation =
new TimestampsAndWatermarksTransformation<>(
"Timestamps/Watermarks",
inputParallelism,
getTransformation(),
cleanedStrategy);
getExecutionEnvironment().addOperator(transformation);
return new SingleOutputStreamOperator<>(getExecutionEnvironment(), transformation);
}
- assignTimestampsAndWatermarks 其实也有一种transformation。
- WatermarkStrategy继承了TimestampAssignerSupplier和WatermarkGeneratorSupplier。TimestampAssigner用来抽取时间戳,WatermarkGenerator用于生成watermark。
WatermarkGenerator
public interface WatermarkGenerator<T> {
/**
* Called for every event, allows the watermark generator to examine and remember the event
* timestamps, or to emit a watermark based on the event itself.
*/
void onEvent(T event, long eventTimestamp, WatermarkOutput output);
/**
* Called periodically, and might emit a new watermark, or not.
*
* <p>The interval in which this method is called and Watermarks are generated depends on {@link
* ExecutionConfig#getAutoWatermarkInterval()}.
*/
void onPeriodicEmit(WatermarkOutput output);
}
- 这里的onPeriodicEmit通过emitWatermark发出水位。它可能会利用TimestampAssigner抽取出来的时间戳当作水位。比如下面这个例子。AssignerWithPeriodicWatermarks这个是TimestampAssigner子类。
public final class AssignerWithPeriodicWatermarksAdapter<T> implements WatermarkGenerator<T> {
private final AssignerWithPeriodicWatermarks<T> wms;
public AssignerWithPeriodicWatermarksAdapter(AssignerWithPeriodicWatermarks<T> wms) {
this.wms = checkNotNull(wms);
}
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();
if (next != null) {
output.emitWatermark(new Watermark(next.getTimestamp()));
}
}
- emitWatermark,实现水位提交。Output接口里也有一个这样的方法,描述是Emits a Watermark from an operator. This watermark is broadcast to all downstream operators。
- 在TimestampsAndWatermarksOperator里,WatermarkOutput的emitWatermark最后调用output的emitWatermark
- 以下类是在TimestampsAndWatermarksOperator里,所以最后也会包在transformation里。
public static final class WatermarkEmitter implements WatermarkOutput {
private final Output<?> output;
private long currentWatermark;
private boolean idle;
public WatermarkEmitter(Output<?> output) {
this.output = output;
this.currentWatermark = Long.MIN_VALUE;
}
@Override
public void emitWatermark(Watermark watermark) {
final long ts = watermark.getTimestamp();
if (ts <= currentWatermark) {
return;
}
currentWatermark = ts;
markActive();
output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(ts));
}
}
关于下发
- 不同的output实现有不同的处理
- RecordWriterOutput 当前operator和下游operator不属于同一个task,则进行广播
- ChainingOutput 当前operator和下游的operator chain在一起,只需经过processWatermark
// RecordWriterOutput.java
public void emitWatermark(Watermark mark) {
if (announcedStatus.isIdle()) {
return;
}
watermarkGauge.setCurrentWatermark(mark.getTimestamp());
serializationDelegate.setInstance(mark);
try {
recordWriter.broadcastEmit(serializationDelegate);
} catch (IOException e) {
throw new UncheckedIOException(e.getMessage(), e);
}
}
// chainingOutput.java
public void emitWatermark(Watermark mark) {
if (announcedStatus.isIdle()) {
return;
}
try {
watermarkGauge.setCurrentWatermark(mark.getTimestamp());
input.processWatermark(mark);
} catch (Exception e) {
throw new ExceptionInChainedOperatorException(e);
}
}
关于触发
现在知道有Watermark的对象下发了,那下游的timer是怎么触发的。
比如WindowOperator,它有一个成员是internalTimerService,它受到InternalTimeServiceManagerImpl管理,这个类里面的advanceWatermark会使用到传递的watermark,最终调用internalTimerService的advanceWatermark对timer进行触发。
// InternalTimeServiceManagerImpl.java
/**
* An entity keeping all the time-related services. Right now, this is only a {@link
* InternalTimerServiceImpl timer services}.
*
* <p><b>NOTE:</b> These services are only available to keyed operators.
*/
public void advanceWatermark(Watermark watermark) throws Exception {
for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
service.advanceWatermark(watermark.getTimestamp());
}
}
// InternalTimerServiceImpl.java
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
keyContext.setCurrentKey(timer.getKey());
eventTimeTimersQueue.poll();
triggerTarget.onEventTime(timer);
}
}
网友评论