美文网首页
Flink ProcessingTime 在事件未到达,但窗口时

Flink ProcessingTime 在事件未到达,但窗口时

作者: zh_harry | 来源:发表于2020-04-27 13:09 被阅读0次
    public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        private ProcessingTimeTrigger() {}
    
    //每个事件进入算子时触发
        @Override
        public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
    //并注册定时器
            ctx.registerProcessingTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    
    @Override
        public void registerProcessingTimeTimer(N namespace, long time) {
            InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
            if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
                long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
                // check if we need to re-schedule our timer to earlier
                if (time < nextTriggerTime) {
                    if (nextTimer != null) {
                        nextTimer.cancel(false);
                    }
    //如果是窗口中的第一个元素由注册定时器
                    nextTimer = processingTimeService.registerTimer(time, this);
                }
            }
        }
    

    根据当前时钟时间与窗口最大时间判断最大delay 时间,所以即使事件未达到时,内部的定时器也会定时触发!

    public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
    
            // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
            // T says we won't see elements in the future with a timestamp smaller or equal to T.
            // With processing time, we therefore need to delay firing the timer by one ms.
            long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;
    
            // we directly try to register the timer and only react to the status on exception
            // that way we save unnecessary volatile accesses for each timer
            try {
                return timerService.schedule(
                        new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
            }
            catch (RejectedExecutionException e) {
                final int status = this.status.get();
                if (status == STATUS_QUIESCED) {
                    return new NeverCompleteFuture(delay);
                }
                else if (status == STATUS_SHUTDOWN) {
                    throw new IllegalStateException("Timer service is shut down");
                }
                else {
                    // something else happened, so propagate the exception
                    throw e;
                }
            }
        }
    

    count 窗口为global窗口

    /**
         * Windows this {@code KeyedStream} into tumbling count windows.
         *
         * @param size The size of the windows in number of elements.
         */
        public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
            return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
        }
    

    count 窗口的触发条件如下

    @Override
        public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
            ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
            count.add(1L);
            if (count.get() >= maxCount) {
                count.clear();
                return TriggerResult.FIRE;
            }
            return TriggerResult.CONTINUE;
        }
    

    相关文章

      网友评论

          本文标题:Flink ProcessingTime 在事件未到达,但窗口时

          本文链接:https://www.haomeiwen.com/subject/jheywhtx.html