美文网首页玩转大数据Flink学习指南Java
Flink 源码之 Table early fire 和 lat

Flink 源码之 Table early fire 和 lat

作者: AlienPaul | 来源:发表于2022-03-14 14:09 被阅读0次

    背景

    Flink SQL的window计算除了window结束后触发计算之外,还支持中途输出window中间累加结果以及对迟到的数据修正window累加结果(以回撤形式输出:先输出UPDATE_BEFORE旧值然后输出UPDATE_AFTER新值)。

    目前版本Flink 1.15 master分支代码中这些为实验特性。下面从配置参数入手,分析下上述功能的实现原理。

    WindowEmitStrategy

    负责接收保存table配置的参数和生成对应的trigger

    WindowEmitStrategy有如下试验性参数:

    @Experimental
    val TABLE_EXEC_EMIT_EARLY_FIRE_ENABLED: ConfigOption[JBoolean] =
    key("table.exec.emit.early-fire.enabled")
    .booleanType()
    .defaultValue(Boolean.box(false))
    .withDescription("Specifies whether to enable early-fire emit." +
                     "Early-fire is an emit strategy before watermark advanced to end of window.")
    
    // It is a experimental config, will may be removed later.
    @Experimental
    val TABLE_EXEC_EMIT_EARLY_FIRE_DELAY: ConfigOption[Duration] =
    key("table.exec.emit.early-fire.delay")
    .durationType()
    .noDefaultValue()
    .withDescription("The early firing delay in milli second, early fire is " +
                     "the emit strategy before watermark advanced to end of window. " +
                     "< 0 is illegal configuration. " +
                     "0 means no delay (fire on every element). " +
                     "> 0 means the fire interval. ")
    
    // It is a experimental config, will may be removed later.
    @Experimental
    val TABLE_EXEC_EMIT_LATE_FIRE_ENABLED: ConfigOption[JBoolean] =
    key("table.exec.emit.late-fire.enabled")
    .booleanType()
    .defaultValue(Boolean.box(false))
    .withDescription("Specifies whether to enable late-fire emit. " +
                     "Late-fire is an emit strategy after watermark advanced to end of window.")
    
    // It is a experimental config, will may be removed later.
    @Experimental
    val TABLE_EXEC_EMIT_LATE_FIRE_DELAY: ConfigOption[Duration] =
    key("table.exec.emit.late-fire.delay")
    .durationType()
    .noDefaultValue()
    .withDescription("The late firing delay in milli second, late fire is " +
                     "the emit strategy after watermark advanced to end of window. " +
                     "< 0 is illegal configuration. " +
                     "0 means no delay (fire on every element). " +
                     "> 0 means the fire interval.")
    
    // It is a experimental config, will may be removed later.
    @Experimental
    val TABLE_EXEC_EMIT_ALLOW_LATENESS: ConfigOption[Duration] =
    key("table.exec.emit.allow-lateness")
    .durationType()
    .noDefaultValue()
    .withDescription("Sets the time by which elements are allowed to be late. " +
                     "Elements that arrive behind the watermark by more than the specified time " +
                     "will be dropped. " +
                     "Note: use the value if it is set, else use 'minIdleStateRetentionTime' in table config." +
                     "< 0 is illegal configuration. " +
                     "0 means disable allow lateness. " +
                     "> 0 means allow-lateness.")
    

    下面是他们的功能解释:

    • table.exec.emit.early-fire.enabled: 是否启用early fire。Early fire的含义是在watermark到达window结束时间点之前输出结果。
    • table.exec.emit.early-fire.delay: early fire结果的时间间隔。如果值大于0,含义为每隔指定时间输出结果。如果值为0,则每次元素到来都会输出结果。
    • table.exec.emit.late-fire.enabled: 是否启用late fire。Late fire的含义是在watermark到达window结束时间点之后输出结果。
    • table.exec.emit.late-fire.delay: late fire结果的时间间隔。和early fire delay的逻辑相似。如果值大于0,含义为每隔指定时间输出结果。如果值为0,则每次元素到来都会输出结果。
    • table.exec.emit.allow-lateness: window 内的数据会在window结束时候保存的额外时间。超过这个时间后,window数据会被清空。注意这个参数和watermark含义的区别。watermark也可以接受迟到的元素,watermark范围内迟到的元素是不会影响计算出正确结果的,在触发计算前会考虑到可能有元素来迟这种情况。可认为是延迟计算,只有watermark到达window end之后才会触发计算。allow-lateness虽然字面上也是允许元素迟到,但是计算结果是以retract(回撤)形式出现的。计算的触发(由watermark决定)不会等待到allow-lateness。因此,在watermark和allow-lateness之前出现的元素,会触发Flink的回撤输出,即先输出原先计算结果无效(UPDATE_BEFORE),然后在补上正确的结果(UPDATE_AFTER)。

    getTrigger方法。这个方法创建出early fire和late fire对应的trigger。这个方法在StreamExecGroupWindowAggregate类的createWindowOperator方法调用。

    getTrigger方法逻辑如下:

    def getTrigger: Trigger[TimeWindow] = {
        // 创建出earlyTrigger和lateTrigger
        val earlyTrigger = createTriggerFromFireDelay(earlyFireDelayEnabled, earlyFireDelay)
        val lateTrigger = createTriggerFromFireDelay(lateFireDelayEnabled, lateFireDelay)
    
        // 根据时间类型创建出不同的组合Trigger
        if (isEventTime) {
            val trigger = EventTimeTriggers.afterEndOfWindow[TimeWindow]()
    
            (earlyTrigger, lateTrigger) match {
                // EventTimeTriggers.AfterEndOfWindowEarlyAndLate类型trigger
                case (Some(early), Some(late)) => trigger.withEarlyFirings(early).withLateFirings(late)
                // EventTimeTriggers.AfterEndOfWindowNoLate类型trigger
                case (Some(early), None) => trigger.withEarlyFirings(early)
                // EventTimeTriggers.AfterEndOfWindow(如果late trigger是ElementTriggers.EveryElement类型)或EventTimeTriggers.AfterEndOfWindowEarlyAndLate类型
                case (None, Some(late)) => trigger.withLateFirings(late)
                // EventTimeTriggers.AfterEndOfWindow类型
                case (None, None) => trigger
            }
        } else {
            val trigger = ProcessingTimeTriggers.afterEndOfWindow[TimeWindow]()
    
            // late trigger is ignored, as no late element in processing time
            earlyTrigger match {
                case Some(early) => trigger.withEarlyFirings(early)
                case None => trigger
            }
        }
    }
    

    EventTimeTriggers是支持early fire和late fire的组合类型trigger,后面分析。

    我们继续分析创建earlyTriggerlateTrigger的方法createTriggerFromFireDelay

    private def createTriggerFromFireDelay(
        enableDelayEmit: JBoolean,
        fireDelay: Duration): Option[Trigger[TimeWindow]] = {
        // 检查是否启用
        if (!enableDelayEmit) {
            None
        } else {
            if (fireDelay.toMillis > 0) {
                // 如果delay大于0,返回周期触发的ProcessingTimeTrigger
                // 为ProcessingTimeTriggers.AfterFirstElementPeriodic类型
                Some(ProcessingTimeTriggers.every(fireDelay))
            } else {
                // 否则,返回ElementTriggers.EveryElement。仅仅在每次元素到来时候触发
                Some(ElementTriggers.every())
            }
        }
    }
    

    和上面配置项解释中一样,如果delay为0,每次元素到来都会触发。如果delay大于0,会生成一个processing time周期触发的trigger,触发间隔为delay。

    EventTimeTriggers

    EventTimeTriggers包含了多种event time类型的early trigger或late trigger的组合实现。我们分析最复杂的AfterEndOfWindowEarlyAndLate。它同时包含early trigger和late trigger。

    下面是三个重要方法onElementonProcessingTimeonEventTime的分析。它内部维护了一个状态量hasFired,用来保存是否进入了late fire状态,从而决定触发early trigger还是late trigger。

    @Override
    public boolean onElement(Object element, long timestamp, W window) throws Exception {
        // 是否该触发late trigger
        Boolean hasFired = ctx.getPartitionedState(hasFiredOnTimeStateDesc).value();
        if (hasFired != null && hasFired) {
            // this is to cover the case where we recover from a failure and the watermark
            // is Long.MIN_VALUE but the window is already in the late phase.
            // 触发lateTrigger.onElement
            return lateTrigger != null && lateTrigger.onElement(element, timestamp, window);
        } else {
            if (triggerTime(window) <= ctx.getCurrentWatermark()) {
                // 在late阶段
                // we are in the late phase
    
                // if there is no late trigger then we fire on every late element
                // This also covers the case of recovery after a failure
                // where the currentWatermark will be Long.MIN_VALUE
                return true;
            } else {
                // we are in the early phase
                // early阶段
                // 注册一个在window结束时间触发的event time定时器
                ctx.registerEventTimeTimer(triggerTime(window));
                return earlyTrigger != null
                        && earlyTrigger.onElement(element, timestamp, window);
            }
        }
    }
    
    @Override
    public boolean onProcessingTime(long time, W window) throws Exception {
        Boolean hasFired = ctx.getPartitionedState(hasFiredOnTimeStateDesc).value();
        if (hasFired != null && hasFired) {
            // late fire
            return lateTrigger != null && lateTrigger.onProcessingTime(time, window);
        } else {
            // early fire
            return earlyTrigger != null && earlyTrigger.onProcessingTime(time, window);
        }
    }
    
    @Override
    public boolean onEventTime(long time, W window) throws Exception {
        ValueState<Boolean> hasFiredState = ctx.getPartitionedState(hasFiredOnTimeStateDesc);
        Boolean hasFired = hasFiredState.value();
        if (hasFired != null && hasFired) {
            // late fire
            return lateTrigger != null && lateTrigger.onEventTime(time, window);
        } else {
            if (time == triggerTime(window)) {
                // window任意一个element到来,都会注册一个在窗口结束时候触发的event time定时器
                // 到达此处说明event time为window结束时间
                // 更新状态,说明接下来要触发late fire
                // fire on time and update state
                hasFiredState.update(true);
                return true;
            } else {
                // early fire
                return earlyTrigger != null && earlyTrigger.onEventTime(time, window);
            }
        }
    }
    

    WindowOperator

    Table对应的WindowOperator位于flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/WindowOperator.java。包含Flink SQL用来保存和计算window数据。

    WindowOperator包含两个子类:AggregateWindowOperator(一个agg group输出一个结果)和TableAggregateWindowOperator(一个agg group输出多个结果)。

    window接收到数据之后调用processElement方法,逻辑和分析如下:

    @Override
    public void processElement(StreamRecord<RowData> record) throws Exception {
        RowData inputRow = record.getValue();
        long timestamp;
        // 获取元素对应的event time或者processing time
        if (windowAssigner.isEventTime()) {
            timestamp = inputRow.getLong(rowtimeIndex);
        } else {
            timestamp = internalTimerService.currentProcessingTime();
        }
    
        // 转换成UTC
        timestamp = TimeWindowUtil.toUtcTimestampMills(timestamp, shiftTimeZone);
    
        // the windows which the input row should be placed into
        // 查找数据影响的window
        // 对于GeneralWindowProcessFunction和MergingWindowProcessFunction,assignStateNamespace和assignActualWindows相同
        // 对于PannedWindowProcessFunction,assignStateNamespace返回的是切片之后的window
        Collection<W> affectedWindows = windowFunction.assignStateNamespace(inputRow, timestamp);
        // 标记元素是否丢弃
        boolean isElementDropped = true;
        for (W window : affectedWindows) {
            // 遍历到了,不丢弃元素
            isElementDropped = false;
    
            windowState.setCurrentNamespace(window);
            // 获取当前聚合的值
            RowData acc = windowState.value();
            if (acc == null) {
                // 如果不存在,创建aggregator
                acc = windowAggregator.createAccumulators();
            }
            windowAggregator.setAccumulators(window, acc);
    
            // 聚合运算累加新值,或者是回撤运算
            if (RowDataUtil.isAccumulateMsg(inputRow)) {
                windowAggregator.accumulate(inputRow);
            } else {
                windowAggregator.retract(inputRow);
            }
            acc = windowAggregator.getAccumulators();
            // 保存聚合结果
            windowState.update(acc);
        }
    
        // the actual window which the input row is belongs to
        // 查找数据真实存在的window
        Collection<W> actualWindows = windowFunction.assignActualWindows(inputRow, timestamp);
        for (W window : actualWindows) {
            // 遍历到了,不丢弃元素
            isElementDropped = false;
            triggerContext.window = window;
            // 如果trigger再元素到来的时候触发,发送window聚合运算结果
            boolean triggerResult = triggerContext.onElement(inputRow, timestamp);
            if (triggerResult) {
                emitWindowResult(window);
            }
            // register a clean up timer for the window
            // 注册一个清理window数据的定时器
            registerCleanupTimer(window);
        }
    
        if (isElementDropped) {
            // markEvent will increase numLateRecordsDropped
            // 标记此丢弃元素,用以计算元素丢弃率
            lateRecordsDroppedRate.markEvent();
        }
    }
    

    registerCleanupTimer注册了一个window清理的定时器。

    private void registerCleanupTimer(W window) {
        long cleanupTime = toEpochMillsForTimer(cleanupTime(window), shiftTimeZone);
        if (cleanupTime == Long.MAX_VALUE) {
            // don't set a GC timer for "end of time"
            return;
        }
    
        // 根据时间类型,注册不同的定时器
        if (windowAssigner.isEventTime()) {
            triggerContext.registerEventTimeTimer(cleanupTime);
        } else {
            triggerContext.registerProcessingTimeTimer(cleanupTime);
        }
    }
    

    registerCleanupTimer注册的timer触发时间由cleanupTime方法决定,它计算window需要清理的时间点。

    private long cleanupTime(W window) {
        if (windowAssigner.isEventTime()) {
            // 如果使用event time,需要考虑到配置参数中的allowedLateness(允许迟到多久)
            // 所以说window中数据的保留时间延长了allowedLateness时长
            long cleanupTime = Math.max(0, window.maxTimestamp() + allowedLateness);
            return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
        } else {
            return Math.max(0, window.maxTimestamp());
        }
    }
    

    由分析可知对于event time类型,通常来说window的清理时间为window.maxTimestamp() + allowedLateness。和上面table.exec.emit.allow-lateness参数的解释相同,数据会在window结束时候额外保存allow-lateness配置的时间。

    接下来我们分析event time trigger触发的执行逻辑onEventTime方法,内容如下:

    @Override
    public void onEventTime(InternalTimer<K, W> timer) throws Exception {
        setCurrentKey(timer.getKey());
    
        // 获取关联的window
        triggerContext.window = timer.getNamespace();
        if (triggerContext.onEventTime(timer.getTimestamp())) {
            // fire
            // 触发计算,将window累计结果发送出去
            emitWindowResult(triggerContext.window);
        }
    
        // 如果到了window清理时间,清理window中的数据
        // window清理时间计算逻辑和cleanupTime方法相同
        if (windowAssigner.isEventTime()) {
            windowFunction.cleanWindowIfNeeded(triggerContext.window, timer.getTimestamp());
        }
    }
    

    onProcessingTime方法内容如下。和onElement方法类似不再赘述。

    @Override
    public void onProcessingTime(InternalTimer<K, W> timer) throws Exception {
        setCurrentKey(timer.getKey());
    
        triggerContext.window = timer.getNamespace();
        if (triggerContext.onProcessingTime(timer.getTimestamp())) {
            // fire
            emitWindowResult(triggerContext.window);
        }
    
        if (!windowAssigner.isEventTime()) {
            windowFunction.cleanWindowIfNeeded(triggerContext.window, timer.getTimestamp());
        }
    }
    

    AggregateWindowOperator 生成retract数据

    WindowOperatoremitWindowResult是一个抽象方法。生成retract数据逻辑位于它的子类AggregateWindowOperator。接下来分析它的代码。

    @Override
    protected void emitWindowResult(W window) throws Exception {
        // 获取window聚合计算结果
        windowFunction.prepareAggregateAccumulatorForEmit(window);
        RowData acc = aggWindowAggregator.getAccumulators();
        RowData aggResult = aggWindowAggregator.getValue(window);
        // 如果生成更新类型数据
        if (produceUpdates) {
            // 获取上一次输出的结果
            previousState.setCurrentNamespace(window);
            RowData previousAggResult = previousState.value();
    
            // recordCounter的值初始化位于WindowOperator的构造方法,来源于inputCountIndex
            // inputCountIndex的含义为SQL中COUNT(*)的位置,如果不包含COUNT(*)值为-1
            // 相关解释位于RecordCounter的of方法说明
            // recordCounter.recordCountIsZero作用是返回RowData数据对应COUNT(*)这一项的值是否是0
            // 如果不是0,需要生成UPDATE_BEFORE和UPDATE_AFTER回撤数据(新老数据不同),如果新老数据相同,无输出
            // 如果是0,需要生成DELETE类型的回撤数据
            if (!recordCounter.recordCountIsZero(acc)) {
                // has emitted result for the window
                if (previousAggResult != null) {
                    // current agg is not equal to the previous emitted, should emit retract
                    // 如果之前输出过数据,并且这次计算结果上次的不同,需要生成回撤retracted数据
                    if (!equaliser.equals(aggResult, previousAggResult)) {
                        // send UPDATE_BEFORE
                        // 生成UPDATE_BEFORE数据
                        collect(
                                RowKind.UPDATE_BEFORE,
                                (RowData) getCurrentKey(),
                                previousAggResult);
                        // send UPDATE_AFTER
                        // 生成UPDATE_AFTER数据
                        collect(RowKind.UPDATE_AFTER, (RowData) getCurrentKey(), aggResult);
                        // update previousState
                        // 更新上一次计算的结果为本次结果
                        previousState.update(aggResult);
                    }
                    // if the previous agg equals to the current agg, no need to send retract and
                    // accumulate
                    // 如果本次计算结果和上次计算的相同,不需要生成回撤数据
                }
                // the first fire for the window, only send INSERT
                else {
                    // 这个分支是第一次输出数据,生成的数据是INSERT
                    // send INSERT
                    collect(RowKind.INSERT, (RowData) getCurrentKey(), aggResult);
                    // update previousState
                    // 更新状态
                    previousState.update(aggResult);
                }
            } else {
                // 生成DELETE类型的回撤数据
                // has emitted result for the window
                // we retracted the last record for this key
                if (previousAggResult != null) {
                    // send DELETE
                    collect(RowKind.DELETE, (RowData) getCurrentKey(), previousAggResult);
                    // clear previousState
                    previousState.clear();
                }
                // if the counter is zero, no need to send accumulate
            }
        } else {
            // 如果不生成更新数据
            if (!recordCounter.recordCountIsZero(acc)) {
                // 如果COUNT统计结果不为0,生成INSERT数据
                // send INSERT
                collect(RowKind.INSERT, (RowData) getCurrentKey(), aggResult);
            }
            // 否则不用输出
            // if the counter is zero, no need to send accumulate
            // there is no possible skip `if` branch when `produceUpdates` is false
        }
    }
    

    本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

    相关文章

      网友评论

        本文标题:Flink 源码之 Table early fire 和 lat

        本文链接:https://www.haomeiwen.com/subject/vkojdrtx.html