美文网首页
聊聊flink的consecutive windowed ope

聊聊flink的consecutive windowed ope

作者: go4it | 来源:发表于2019-01-09 23:27 被阅读11次

    本文主要研究一下flink的consecutive windowed operations

    实例

    DataStream<Integer> input = ...;
    
    DataStream<Integer> resultsPerKey = input
        .keyBy(<key selector>)
        .window(TumblingEventTimeWindows.of(Time.seconds(5)))
        .reduce(new Summer());
    
    DataStream<Integer> globalResults = resultsPerKey
        .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
        .process(new TopKWindowFunction());
    
    • 本实例首先根据key进行partition,然后再按指定的window对这些key进行计数,之后对该dataStream进行windowAll操作,其时间WindowAssigner与前面的相同,这样可以达到在同样的时间窗口内先partition汇总,再全局汇总的效果(可以解决类似top-k elements的问题)

    TimestampsAndPeriodicWatermarksOperator

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperator.java

    public class TimestampsAndPeriodicWatermarksOperator<T>
            extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
            implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
    
        private static final long serialVersionUID = 1L;
    
        private transient long watermarkInterval;
    
        private transient long currentWatermark;
    
        public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
            super(assigner);
            this.chainingStrategy = ChainingStrategy.ALWAYS;
        }
    
        @Override
        public void open() throws Exception {
            super.open();
    
            currentWatermark = Long.MIN_VALUE;
            watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
    
            if (watermarkInterval > 0) {
                long now = getProcessingTimeService().getCurrentProcessingTime();
                getProcessingTimeService().registerTimer(now + watermarkInterval, this);
            }
        }
    
        @Override
        public void processElement(StreamRecord<T> element) throws Exception {
            final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
                    element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
    
            output.collect(element.replace(element.getValue(), newTimestamp));
        }
    
        @Override
        public void onProcessingTime(long timestamp) throws Exception {
            // register next timer
            Watermark newWatermark = userFunction.getCurrentWatermark();
            if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
                currentWatermark = newWatermark.getTimestamp();
                // emit watermark
                output.emitWatermark(newWatermark);
            }
    
            long now = getProcessingTimeService().getCurrentProcessingTime();
            getProcessingTimeService().registerTimer(now + watermarkInterval, this);
        }
    
        /**
         * Override the base implementation to completely ignore watermarks propagated from
         * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
         * watermarks from here).
         */
        @Override
        public void processWatermark(Watermark mark) throws Exception {
            // if we receive a Long.MAX_VALUE watermark we forward it since it is used
            // to signal the end of input and to not block watermark progress downstream
            if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
                currentWatermark = Long.MAX_VALUE;
                output.emitWatermark(mark);
            }
        }
    
        @Override
        public void close() throws Exception {
            super.close();
    
            // emit a final watermark
            Watermark newWatermark = userFunction.getCurrentWatermark();
            if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
                currentWatermark = newWatermark.getTimestamp();
                // emit watermark
                output.emitWatermark(newWatermark);
            }
        }
    }
    
    • 假设assignTimestampsAndWatermarks使用的是AssignerWithPeriodicWatermarks类型的参数,那么创建的是TimestampsAndPeriodicWatermarksOperator;它在open的时候根据指定的watermarkInterval注册了一个延时任务
    • 该延时任务会回调onProcessingTime方法,而onProcessingTime在这里则会调用AssignerWithPeriodicWatermarks的getCurrentWatermark方法获取watermark,然后重新注册新的延时任务,延时时间为getProcessingTimeService().getCurrentProcessingTime()+watermarkInterval;这里的watermarkInterval即为env.getConfig().setAutoWatermarkInterval设置的值
    • AssignerWithPeriodicWatermarks的getCurrentWatermark方法除了注册延时任务实现不断定时的效果外,还会在新的watermark值大于currentWatermark的条件下发射watermark

    SystemProcessingTimeService

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.java

    public class SystemProcessingTimeService extends ProcessingTimeService {
    
        private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);
    
        private static final int STATUS_ALIVE = 0;
        private static final int STATUS_QUIESCED = 1;
        private static final int STATUS_SHUTDOWN = 2;
    
        // ------------------------------------------------------------------------
    
        /** The containing task that owns this time service provider. */
        private final AsyncExceptionHandler task;
    
        /** The lock that timers acquire upon triggering. */
        private final Object checkpointLock;
    
        /** The executor service that schedules and calls the triggers of this task. */
        private final ScheduledThreadPoolExecutor timerService;
    
        private final AtomicInteger status;
    
        public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) {
            this(failureHandler, checkpointLock, null);
        }
    
        public SystemProcessingTimeService(
                AsyncExceptionHandler task,
                Object checkpointLock,
                ThreadFactory threadFactory) {
    
            this.task = checkNotNull(task);
            this.checkpointLock = checkNotNull(checkpointLock);
    
            this.status = new AtomicInteger(STATUS_ALIVE);
    
            if (threadFactory == null) {
                this.timerService = new ScheduledThreadPoolExecutor(1);
            } else {
                this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
            }
    
            // tasks should be removed if the future is canceled
            this.timerService.setRemoveOnCancelPolicy(true);
    
            // make sure shutdown removes all pending tasks
            this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
            this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        }
    
        @Override
        public long getCurrentProcessingTime() {
            return System.currentTimeMillis();
        }
    
        @Override
        public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
    
            // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
            // T says we won't see elements in the future with a timestamp smaller or equal to T.
            // With processing time, we therefore need to delay firing the timer by one ms.
            long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;
    
            // we directly try to register the timer and only react to the status on exception
            // that way we save unnecessary volatile accesses for each timer
            try {
                return timerService.schedule(
                        new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
            }
            catch (RejectedExecutionException e) {
                final int status = this.status.get();
                if (status == STATUS_QUIESCED) {
                    return new NeverCompleteFuture(delay);
                }
                else if (status == STATUS_SHUTDOWN) {
                    throw new IllegalStateException("Timer service is shut down");
                }
                else {
                    // something else happened, so propagate the exception
                    throw e;
                }
            }
        }
    
        //......
    }
    
    • SystemProcessingTimeService的registerTimer方法根据指定的timestamp注册了一个延时任务TriggerTask;timerService为JDK自带的ScheduledThreadPoolExecutor;TriggerTask的run方法会在service状态为STATUS_LIVE时,触发ProcessingTimeCallback(这里为TimestampsAndPeriodicWatermarksOperator)的onProcessingTime方法

    WindowOperator

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java

    @Internal
    public class WindowOperator<K, IN, ACC, OUT, W extends Window>
        extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
        implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {
    
        //......
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            final Collection<W> elementWindows = windowAssigner.assignWindows(
                element.getValue(), element.getTimestamp(), windowAssignerContext);
    
            //if element is handled by none of assigned elementWindows
            boolean isSkippedElement = true;
    
            final K key = this.<K>getKeyedStateBackend().getCurrentKey();
    
            if (windowAssigner instanceof MergingWindowAssigner) {
    
                //......
    
            } else {
                for (W window: elementWindows) {
    
                    // drop if the window is already late
                    if (isWindowLate(window)) {
                        continue;
                    }
                    isSkippedElement = false;
    
                    windowState.setCurrentNamespace(window);
                    windowState.add(element.getValue());
    
                    triggerContext.key = key;
                    triggerContext.window = window;
    
                    TriggerResult triggerResult = triggerContext.onElement(element);
    
                    if (triggerResult.isFire()) {
                        ACC contents = windowState.get();
                        if (contents == null) {
                            continue;
                        }
                        emitWindowContents(window, contents);
                    }
    
                    if (triggerResult.isPurge()) {
                        windowState.clear();
                    }
                    registerCleanupTimer(window);
                }
            }
    
            // side output input event if
            // element not handled by any window
            // late arriving tag has been set
            // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
            if (isSkippedElement && isElementLate(element)) {
                if (lateDataOutputTag != null){
                    sideOutput(element);
                } else {
                    this.numLateRecordsDropped.inc();
                }
            }
        }
    
        /**
         * Emits the contents of the given window using the {@link InternalWindowFunction}.
         */
        @SuppressWarnings("unchecked")
        private void emitWindowContents(W window, ACC contents) throws Exception {
            timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
            processContext.window = window;
            userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
        }
    
        //......
    }
    
    • WindowOperator的processElement方法会把element添加到windowState,这里为HeapAggregatingState,即在内存中累积,之后调用triggerContext.onElement方法(里头使用的是trigger.onElement方法,这里的trigger为EventTimeTrigger)获取TriggerResult,如果需要fire,则会触发emitWindowContents,如果需要purge则会清空windowState;emitWindowContents则是调用userFunction.process执行用户定义的窗口操作

    EventTimeTrigger

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java

    @PublicEvolving
    public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        private EventTimeTrigger() {}
    
        @Override
        public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
            if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
                // if the watermark is already past the window fire immediately
                return TriggerResult.FIRE;
            } else {
                ctx.registerEventTimeTimer(window.maxTimestamp());
                return TriggerResult.CONTINUE;
            }
        }
    
        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ?
                TriggerResult.FIRE :
                TriggerResult.CONTINUE;
        }
    
        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }
    
        @Override
        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
            ctx.deleteEventTimeTimer(window.maxTimestamp());
        }
    
        @Override
        public boolean canMerge() {
            return true;
        }
    
        @Override
        public void onMerge(TimeWindow window,
                OnMergeContext ctx) {
            // only register a timer if the watermark is not yet past the end of the merged window
            // this is in line with the logic in onElement(). If the watermark is past the end of
            // the window onElement() will fire and setting a timer here would fire the window twice.
            long windowMaxTimestamp = window.maxTimestamp();
            if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
                ctx.registerEventTimeTimer(windowMaxTimestamp);
            }
        }
    
        @Override
        public String toString() {
            return "EventTimeTrigger()";
        }
    
        public static EventTimeTrigger create() {
            return new EventTimeTrigger();
        }
    }
    
    • EventTimeTrigger的onElement方法会判断,如果window.maxTimestamp() <= ctx.getCurrentWatermark()则会返回TriggerResult.FIRE,告知WindowOperator可以emitWindowContents

    小结

    • flink支持consecutive windowed operations,比如先根据key进行partition,然后再按指定的window对这些key进行计数,之后对该dataStream进行windowAll操作,其时间WindowAssigner与前面的相同,这样可以达到在同样的时间窗口内先partition汇总,再全局汇总的效果(可以解决类似top-k elements的问题)
    • AssignerWithPeriodicWatermarks或者AssignerWithPunctuatedWatermarks它们有两个功能,一个是从element提取timestamp作为eventTime,一个就是发射watermark;由于element实际上不一定是严格按eventTime时间到来的,可能存在乱序,因而watermark的作用就是限制迟到的数据进入窗口,不让窗口无限等待迟到的可能属于该窗口的element,即告知窗口eventTime小于等于该watermark的元素可以认为都到达了(窗口可以根据自己设定的时间范围,借助trigger判断是否可以关闭窗口然后开始对该窗口数据执行相关操作);对于consecutive windowed operations来说,上游的watermark会forward给下游的operations
    • Trigger的作用就是告知WindowOperator什么时候可以对关闭该窗口开始对该窗口数据执行相关操作(返回TriggerResult.FIRE的情况下),对于EventTimeTrigger来说,其onElement方法的判断逻辑跟watermark相关,如果window.maxTimestamp() <= ctx.getCurrentWatermark()则会返回TriggerResult.FIRE

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的consecutive windowed ope

          本文链接:https://www.haomeiwen.com/subject/mfnnrqtx.html