美文网首页
Flink ProcessingTime 在事件未到达,但窗口时

Flink ProcessingTime 在事件未到达,但窗口时

作者: zh_harry | 来源:发表于2020-04-27 13:09 被阅读0次
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private ProcessingTimeTrigger() {}

//每个事件进入算子时触发
    @Override
    public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
//并注册定时器
        ctx.registerProcessingTimeTimer(window.maxTimestamp());
        return TriggerResult.CONTINUE;
    }
@Override
    public void registerProcessingTimeTimer(N namespace, long time) {
        InternalTimer<K, N> oldHead = processingTimeTimersQueue.peek();
        if (processingTimeTimersQueue.add(new TimerHeapInternalTimer<>(time, (K) keyContext.getCurrentKey(), namespace))) {
            long nextTriggerTime = oldHead != null ? oldHead.getTimestamp() : Long.MAX_VALUE;
            // check if we need to re-schedule our timer to earlier
            if (time < nextTriggerTime) {
                if (nextTimer != null) {
                    nextTimer.cancel(false);
                }
//如果是窗口中的第一个元素由注册定时器
                nextTimer = processingTimeService.registerTimer(time, this);
            }
        }
    }

根据当前时钟时间与窗口最大时间判断最大delay 时间,所以即使事件未达到时,内部的定时器也会定时触发!

public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {

        // delay the firing of the timer by 1 ms to align the semantics with watermark. A watermark
        // T says we won't see elements in the future with a timestamp smaller or equal to T.
        // With processing time, we therefore need to delay firing the timer by one ms.
        long delay = Math.max(timestamp - getCurrentProcessingTime(), 0) + 1;

        // we directly try to register the timer and only react to the status on exception
        // that way we save unnecessary volatile accesses for each timer
        try {
            return timerService.schedule(
                    new TriggerTask(status, task, checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException e) {
            final int status = this.status.get();
            if (status == STATUS_QUIESCED) {
                return new NeverCompleteFuture(delay);
            }
            else if (status == STATUS_SHUTDOWN) {
                throw new IllegalStateException("Timer service is shut down");
            }
            else {
                // something else happened, so propagate the exception
                throw e;
            }
        }
    }

count 窗口为global窗口

/**
     * Windows this {@code KeyedStream} into tumbling count windows.
     *
     * @param size The size of the windows in number of elements.
     */
    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
        return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }

count 窗口的触发条件如下

@Override
    public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
        ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
        count.add(1L);
        if (count.get() >= maxCount) {
            count.clear();
            return TriggerResult.FIRE;
        }
        return TriggerResult.CONTINUE;
    }

相关文章

  • Flink ProcessingTime 在事件未到达,但窗口时

    根据当前时钟时间与窗口最大时间判断最大delay 时间,所以即使事件未达到时,内部的定时器也会定时触发! coun...

  • flink IngestionTime介绍

    flink的窗口时间属性TimeCharacteristic分为三种:ProcessingTime,Ingesti...

  • Flink Eventtime 和 Watermark

    Flink 中提供了3种时间模型:EventTime、ProcessingTime、IngestionTime。底...

  • flink的TimeCharacteristic

    概述: flink的TimeCharacteristic枚举定义了三类值,分别是ProcessingTime、In...

  • Flink学习笔记:时间与窗口

    Flink学习笔记:时间与窗口 1. 时间 在flink中定义了三类时间: 事件时间(Event Time):即事...

  • 问题定位:Flink水位线不触发问题

    Flink水位线不触发问题 窗口计算时遇到好几次水位线不触发的情况,简单总结下。 首先,介绍下Flink的事件时间...

  • Flink 概述

    Flink 相关概念 Flink 支持哪些流式特性 exactly-once 语义。 支持基于事件发生时间的窗口计...

  • Flink与storm的主要区别译文。

    Qestion: Flink被用来和Spark相比,但是我认为这样的比较不太合适,把Flink窗口事件和Spark...

  • 05 Flink窗口

    Flink窗口按行为分有滑动滚动窗口,按划分标准有事件时间窗口。本节将演示各个窗口的使用。 1、 系统、软件以及前...

  • Flink和Spark比较

    Flink被用来和Spark相比,但是我认为这样的比较不太合适,把Flink窗口事件和Spark微批处理进行比较,...

网友评论

      本文标题:Flink ProcessingTime 在事件未到达,但窗口时

      本文链接:https://www.haomeiwen.com/subject/jheywhtx.html