美文网首页Flink源码阅读系列
Flink源码阅读之Timer定时器

Flink源码阅读之Timer定时器

作者: 〇白衣卿相〇 | 来源:发表于2020-06-10 19:55 被阅读0次

概述

window执行过程篇也提到了定时器的注册,在flink中有很多定时器的使用,比如窗口trigger的触发、watermark的周期生成,其实定时器底层是依赖jdk的ScheduledThreadPoolExecutor来调度的。

入口

通过TimerService 接口注册和删除定时器。

public interface TimerService {
    long currentProcessingTime();
    long currentWatermark();

    void registerProcessingTimeTimer(long time);
    void registerEventTimeTimer(long time);

    void deleteProcessingTimeTimer(long time);
    void deleteEventTimeTimer(long time);
}

Flink 内部使用 InternalTimerService,可以设置 timer 关联的 namespace 和 key。
在 InternalTimeService 中注册的 timer 有两种类型,分别为基于系统时间的和基于事件时间的,它使用两个优先级队列分别保存这两种类型的 timer。Timer 则被抽象为接口 InternalTimer,每个 timer 有绑定的 key,namespace 和触发时间 timestamp,TimerHeapInternalTimer 是其具体实现。InternalTimerServiceImpl 内部的两个优先级队列会按照触发时间的大小进行排序。

Timer 只能在 KeyedStream 中使用,例如在KeyedProcessFunction中注册

ctx.timerService().registerEventTimeTimer(time);
ctx.timerService().registerProcessingTimeTimer(time);

执行过程

Processing time timer

public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            // check if we need to re-schedule our timer to earlier
            if (time < nextTriggerTime) {
                if (nextTimer != null) {
                    nextTimer.cancel(false);
                }
                nextTimer = processingTimeService.registerTimer(time, this::onProcessingTime);
            }
        }
    }

当调用registerProcessingTimeTimer注册后,会尝试将当前time添加到优先级队列中,如果添加失败说明已经存在了同一个key同一个time的定时器已存在。如果当前time时间小于队列头的Timer时间,那么将当前时间注册为下一个待调度的定时器。
具体注册定时器的过程在SystemProcessingTimeService

/**
     * Registers a task to be executed no sooner than time {@code timestamp}, but without strong
     * guarantees of order.
     *
     * @param timestamp Time when the task is to be enabled (in processing time)
     * @param callback    The task to be executed
     * @return The future that represents the scheduled task. This always returns some future,
     *         even if the timer was shut down
     */
    @Override
    public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback callback) {

        // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
        // T says we won't see elements in the future with a timestamp smaller or equal to T.
        // With processing time, we therefore need to delay firing the timer by one ms.
        long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;

        // we directly try to register the timer and only react to the status on exception
        // that way we save unnecessary volatile accesses for each timer
        try {
            return timerService.schedule(wrapOnTimerCallback(callback, timestamp), delay, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException e) {
            final int status = this.status.get();
            if (status == STATUS_QUIESCED) {
                return new NeverCompleteFuture(delay);
            }
            else if (status == STATUS_SHUTDOWN) {
                throw new IllegalStateException("Timer service is shut down");
            }
            else {
                // something else happened, so propagate the exception
                throw e;
            }
        }
    }

把callback回调函数封装为ScheduledTask放入线程池中等待调度。
当定时器触发时会回调onProcessingTime方法。触发Triggerable的onProcessingTime方法,执行所有满足条件的定时器,也就是用户实现的udf处理逻辑,并注册下一个定时器。

private void onProcessingTime(long time) throws Exception {
        // null out the timer in case the Triggerable calls registerProcessingTimeTimer()
        // inside the callback.
        nextTimer = null;

        InternalTimer<K, N> timer;

        while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            processingTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onProcessingTime(timer);
        }

        if (timer != null && nextTimer == null) {
            nextTimer = processingTimeService.registerTimer(timer.getTimestamp(), this::onProcessingTime);
        }
    }

Event time timer

Event time的定时器则依赖于水印的流动。

public void registerEventTimeTimer(N namespace, long time) {
        eventTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace));
    }

注册后会添加到eventTime的优先级队列中,我们知道watermark是一直递增的,随着element在datastream中流动,当watermark被处理时

private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
        if (recordOrMark.isRecord()){
            output.emitRecord(recordOrMark.asRecord());
        } else if (recordOrMark.isWatermark()) {
            statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
        } else if (recordOrMark.isLatencyMarker()) {
            output.emitLatencyMarker(recordOrMark.asLatencyMarker());
        } else if (recordOrMark.isStreamStatus()) {
            statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);
        } else {
            throw new UnsupportedOperationException("Unknown type of StreamElement");
        }
    }

之后会调用AbstractStreamOperator#processWatermark

public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark);
        }
        output.emitWatermark(mark);
    }

最终调用到InternalTimerServiceImpl#advanceWatermark

调用链路是StreamTaskNetworkInput#processElement→StatusWatermarkValve#inputWatermark→StatusWatermarkValve#findAndOutputNewMinWatermarkAcrossAlignedChannels→OneInputStreamTask#emitWatermark→ProcessOperator#processWatermark→AbstractStreamOperator#processWatermark→InternalTimeServiceManager#advanceWatermark→InternalTimerServiceImpl#advanceWatermark

public void advanceWatermark(long time) throws Exception {
        currentWatermark = time;

        InternalTimer<K, N> timer;

        while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
            eventTimeTimersQueue.poll();
            keyContext.setCurrentKey(timer.getKey());
            triggerTarget.onEventTime(timer);
        }
    }

这里就和processing time的类似,从优先级队列中取出满足触发条件的timer,调用udf的onEventTime执行具体逻辑。

总结

本文主要分析了定时器Timer在Flink内部的应用,分别分析了基于Processing time和Event time具体的执行逻辑。

相关文章

网友评论

    本文标题:Flink源码阅读之Timer定时器

    本文链接:https://www.haomeiwen.com/subject/bztmtktx.html