美文网首页
Flink WaterMark

Flink WaterMark

作者: kaiker | 来源:发表于2022-03-27 21:02 被阅读0次

关于Operator\Transformation:https://www.jianshu.com/p/733a8ed1a9ef

assignTimestampsAndWatermarks

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            WatermarkStrategy<T> watermarkStrategy) {
        final WatermarkStrategy<T> cleanedStrategy = clean(watermarkStrategy);
        // match parallelism to input, to have a 1:1 source -> timestamps/watermarks relationship
        // and chain
        final int inputParallelism = getTransformation().getParallelism();
        final TimestampsAndWatermarksTransformation<T> transformation =
                new TimestampsAndWatermarksTransformation<>(
                        "Timestamps/Watermarks",
                        inputParallelism,
                        getTransformation(),
                        cleanedStrategy);
        getExecutionEnvironment().addOperator(transformation);
        return new SingleOutputStreamOperator<>(getExecutionEnvironment(), transformation);
    }
  • assignTimestampsAndWatermarks 其实也有一种transformation。
  • WatermarkStrategy继承了TimestampAssignerSupplier和WatermarkGeneratorSupplier。TimestampAssigner用来抽取时间戳,WatermarkGenerator用于生成watermark。

WatermarkGenerator

public interface WatermarkGenerator<T> {

    /**
     * Called for every event, allows the watermark generator to examine and remember the event
     * timestamps, or to emit a watermark based on the event itself.
     */
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);

    /**
     * Called periodically, and might emit a new watermark, or not.
     *
     * <p>The interval in which this method is called and Watermarks are generated depends on {@link
     * ExecutionConfig#getAutoWatermarkInterval()}.
     */
    void onPeriodicEmit(WatermarkOutput output);
}
  • 这里的onPeriodicEmit通过emitWatermark发出水位。它可能会利用TimestampAssigner抽取出来的时间戳当作水位。比如下面这个例子。AssignerWithPeriodicWatermarks这个是TimestampAssigner子类。
public final class AssignerWithPeriodicWatermarksAdapter<T> implements WatermarkGenerator<T> {

    private final AssignerWithPeriodicWatermarks<T> wms;

    public AssignerWithPeriodicWatermarksAdapter(AssignerWithPeriodicWatermarks<T> wms) {
        this.wms = checkNotNull(wms);
    }

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {}

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        final org.apache.flink.streaming.api.watermark.Watermark next = wms.getCurrentWatermark();
        if (next != null) {
            output.emitWatermark(new Watermark(next.getTimestamp()));
        }
    }
  • emitWatermark,实现水位提交。Output接口里也有一个这样的方法,描述是Emits a Watermark from an operator. This watermark is broadcast to all downstream operators。
  • 在TimestampsAndWatermarksOperator里,WatermarkOutput的emitWatermark最后调用output的emitWatermark
  • 以下类是在TimestampsAndWatermarksOperator里,所以最后也会包在transformation里。
public static final class WatermarkEmitter implements WatermarkOutput {

        private final Output<?> output;

        private long currentWatermark;

        private boolean idle;

        public WatermarkEmitter(Output<?> output) {
            this.output = output;
            this.currentWatermark = Long.MIN_VALUE;
        }

        @Override
        public void emitWatermark(Watermark watermark) {
            final long ts = watermark.getTimestamp();

            if (ts <= currentWatermark) {
                return;
            }

            currentWatermark = ts;

            markActive();

            output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(ts));
        }
}

关于下发

  • 不同的output实现有不同的处理
  • RecordWriterOutput 当前operator和下游operator不属于同一个task,则进行广播
  • ChainingOutput 当前operator和下游的operator chain在一起,只需经过processWatermark

https://blog.csdn.net/jon_snow666/article/details/106053774

// RecordWriterOutput.java
    public void emitWatermark(Watermark mark) {
        if (announcedStatus.isIdle()) {
            return;
        }

        watermarkGauge.setCurrentWatermark(mark.getTimestamp());
        serializationDelegate.setInstance(mark);

        try {
            recordWriter.broadcastEmit(serializationDelegate);
        } catch (IOException e) {
            throw new UncheckedIOException(e.getMessage(), e);
        }
    }
// chainingOutput.java
    public void emitWatermark(Watermark mark) {
        if (announcedStatus.isIdle()) {
            return;
        }
        try {
            watermarkGauge.setCurrentWatermark(mark.getTimestamp());
            input.processWatermark(mark);
        } catch (Exception e) {
            throw new ExceptionInChainedOperatorException(e);
        }
    }

关于触发

https://www.jianshu.com/p/913daa6beead

现在知道有Watermark的对象下发了,那下游的timer是怎么触发的。
比如WindowOperator,它有一个成员是internalTimerService,它受到InternalTimeServiceManagerImpl管理,这个类里面的advanceWatermark会使用到传递的watermark,最终调用internalTimerService的advanceWatermark对timer进行触发。

// InternalTimeServiceManagerImpl.java
/**
 * An entity keeping all the time-related services. Right now, this is only a {@link
 * InternalTimerServiceImpl timer services}.
 *
 * <p><b>NOTE:</b> These services are only available to keyed operators.
 */
    public void advanceWatermark(Watermark watermark) throws Exception {
        for (InternalTimerServiceImpl<?, ?> service : timerServices.values()) {
            service.advanceWatermark(watermark.getTimestamp());
        }
    }
// InternalTimerServiceImpl.java
    public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;

        InternalTimer<K, N> timer;

        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            keyContext.setCurrentKey(timer.getKey());
            eventTimeTimersQueue.poll();
            triggerTarget.onEventTime(timer);
        }
    }

相关文章

网友评论

      本文标题:Flink WaterMark

      本文链接:https://www.haomeiwen.com/subject/obgpjrtx.html