美文网首页
聊聊flink的Allowed Lateness

聊聊flink的Allowed Lateness

作者: go4it | 来源:发表于2019-01-08 19:54 被阅读17次

    本文主要研究一下flink的Allowed Lateness

    WindowedStream

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/WindowedStream.java

    @Public
    public class WindowedStream<T, K, W extends Window> {
    
        /** The keyed data stream that is windowed by this stream. */
        private final KeyedStream<T, K> input;
    
        /** The window assigner. */
        private final WindowAssigner<? super T, W> windowAssigner;
    
        /** The trigger that is used for window evaluation/emission. */
        private Trigger<? super T, ? super W> trigger;
    
        /** The evictor that is used for evicting elements before window evaluation. */
        private Evictor<? super T, ? super W> evictor;
    
        /** The user-specified allowed lateness. */
        private long allowedLateness = 0L;
    
        /**
         * Side output {@code OutputTag} for late data. If no tag is set late data will simply be
         * dropped.
         */
        private OutputTag<T> lateDataOutputTag;
    
        @PublicEvolving
        public WindowedStream<T, K, W> allowedLateness(Time lateness) {
            final long millis = lateness.toMilliseconds();
            checkArgument(millis >= 0, "The allowed lateness cannot be negative.");
    
            this.allowedLateness = millis;
            return this;
        }
    
        @PublicEvolving
        public WindowedStream<T, K, W> sideOutputLateData(OutputTag<T> outputTag) {
            Preconditions.checkNotNull(outputTag, "Side output tag must not be null.");
            this.lateDataOutputTag = input.getExecutionEnvironment().clean(outputTag);
            return this;
        }
    
        //......
    
        public <R> SingleOutputStreamOperator<R> reduce(
                ReduceFunction<T> reduceFunction,
                WindowFunction<T, R, K, W> function,
                TypeInformation<R> resultType) {
    
            if (reduceFunction instanceof RichFunction) {
                throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction.");
            }
    
            //clean the closures
            function = input.getExecutionEnvironment().clean(function);
            reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
    
            final String opName = generateOperatorName(windowAssigner, trigger, evictor, reduceFunction, function);
            KeySelector<T, K> keySel = input.getKeySelector();
    
            OneInputStreamOperator<T, R> operator;
    
            if (evictor != null) {
                @SuppressWarnings({"unchecked", "rawtypes"})
                TypeSerializer<StreamRecord<T>> streamRecordSerializer =
                    (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
    
                ListStateDescriptor<StreamRecord<T>> stateDesc =
                    new ListStateDescriptor<>("window-contents", streamRecordSerializer);
    
                operator =
                    new EvictingWindowOperator<>(windowAssigner,
                        windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                        keySel,
                        input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                        stateDesc,
                        new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction<>(reduceFunction, function)),
                        trigger,
                        evictor,
                        allowedLateness,
                        lateDataOutputTag);
    
            } else {
                ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor<>("window-contents",
                    reduceFunction,
                    input.getType().createSerializer(getExecutionEnvironment().getConfig()));
    
                operator =
                    new WindowOperator<>(windowAssigner,
                        windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                        keySel,
                        input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                        stateDesc,
                        new InternalSingleValueWindowFunction<>(function),
                        trigger,
                        allowedLateness,
                        lateDataOutputTag);
            }
    
            return input.transform(opName, resultType, operator);
        }
    
        //......
    }
    
    • WindowedStream有两个参数跟Allowed Lateness相关,一个是allowedLateness,用于指定允许元素迟到的时间长度,一个是lateDataOutputTag,用于配置迟到元素的输出
    • WindowedStream的reduce、aggregate、fold、process等操作里头会根据evictor是否为null来创建不同的WindowOperator(evictor不为null创建的是EvictingWindowOperator,evictor为null创建的是WindowOperator)
    • EvictingWindowOperator继承了WindowOperator,其构造器比WindowOperator多了Evictor参数,但它们构造器都需要Trigger、allowedLateness、lateDataOutputTag参数

    OutputTag

    flink-core-1.7.0-sources.jar!/org/apache/flink/util/OutputTag.java

    @PublicEvolving
    public class OutputTag<T> implements Serializable {
    
        private static final long serialVersionUID = 2L;
    
        private final String id;
    
        private final TypeInformation<T> typeInfo;
    
        public OutputTag(String id) {
            Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
            Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
            this.id = id;
    
            try {
                this.typeInfo = TypeExtractor.createTypeInfo(this, OutputTag.class, getClass(), 0);
            }
            catch (InvalidTypesException e) {
                throw new InvalidTypesException("Could not determine TypeInformation for the OutputTag type. " +
                        "The most common reason is forgetting to make the OutputTag an anonymous inner class. " +
                        "It is also not possible to use generic type variables with OutputTags, such as 'Tuple2<A, B>'.", e);
            }
        }
    
        public OutputTag(String id, TypeInformation<T> typeInfo) {
            Preconditions.checkNotNull(id, "OutputTag id cannot be null.");
            Preconditions.checkArgument(!id.isEmpty(), "OutputTag id must not be empty.");
            this.id = id;
            this.typeInfo = Preconditions.checkNotNull(typeInfo, "TypeInformation cannot be null.");
        }
    
        // ------------------------------------------------------------------------
    
        public String getId() {
            return id;
        }
    
        public TypeInformation<T> getTypeInfo() {
            return typeInfo;
        }
    
        // ------------------------------------------------------------------------
    
        @Override
        public boolean equals(Object obj) {
            return obj instanceof OutputTag
                && ((OutputTag) obj).id.equals(this.id);
        }
    
        @Override
        public int hashCode() {
            return id.hashCode();
        }
    
        @Override
        public String toString() {
            return "OutputTag(" + getTypeInfo() + ", " + id + ")";
        }
    }
    
    • OutputTag是一个带有名称及类型信息的side output标识;flink允许ProcessFunction、CoProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction这些function输出side output,这些function的Context有一个output(OutputTag<X> outputTag, X value)方法用于输出元素到side output

    SingleOutputStreamOperator

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java

    @Public
    public class SingleOutputStreamOperator<T> extends DataStream<T> {
    
        protected boolean nonParallel = false;
    
        private Map<OutputTag<?>, TypeInformation> requestedSideOutputs = new HashMap<>();
    
        private boolean wasSplitApplied = false;
    
        //......
    
        public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
            if (wasSplitApplied) {
                throw new UnsupportedOperationException("getSideOutput() and split() may not be called on the same DataStream. " +
                    "As a work-around, please add a no-op map function before the split() call.");
            }
    
            sideOutputTag = clean(requireNonNull(sideOutputTag));
    
            // make a defensive copy
            sideOutputTag = new OutputTag<X>(sideOutputTag.getId(), sideOutputTag.getTypeInfo());
    
            TypeInformation<?> type = requestedSideOutputs.get(sideOutputTag);
            if (type != null && !type.equals(sideOutputTag.getTypeInfo())) {
                throw new UnsupportedOperationException("A side output with a matching id was " +
                        "already requested with a different type. This is not allowed, side output " +
                        "ids need to be unique.");
            }
    
            requestedSideOutputs.put(sideOutputTag, sideOutputTag.getTypeInfo());
    
            SideOutputTransformation<X> sideOutputTransformation = new SideOutputTransformation<>(this.getTransformation(), sideOutputTag);
            return new DataStream<>(this.getExecutionEnvironment(), sideOutputTransformation);
        }
    }
    
    • SingleOutputStreamOperator提供了getSideOutput方法,可以根据OutputTag来获取之前在function里头输出的site output

    WindowOperator

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java

    @Internal
    public class WindowOperator<K, IN, ACC, OUT, W extends Window>
        extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
        implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {
    
        //......
    
        public void processElement(StreamRecord<IN> element) throws Exception {
            final Collection<W> elementWindows = windowAssigner.assignWindows(
                element.getValue(), element.getTimestamp(), windowAssignerContext);
    
            //if element is handled by none of assigned elementWindows
            boolean isSkippedElement = true;
    
            final K key = this.<K>getKeyedStateBackend().getCurrentKey();
    
            if (windowAssigner instanceof MergingWindowAssigner) {
                MergingWindowSet<W> mergingWindows = getMergingWindowSet();
    
                for (W window: elementWindows) {
    
                    // adding the new window might result in a merge, in that case the actualWindow
                    // is the merged window and we work with that. If we don't merge then
                    // actualWindow == window
                    W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
                        @Override
                        public void merge(W mergeResult,
                                Collection<W> mergedWindows, W stateWindowResult,
                                Collection<W> mergedStateWindows) throws Exception {
    
                            if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
                                throw new UnsupportedOperationException("The end timestamp of an " +
                                        "event-time window cannot become earlier than the current watermark " +
                                        "by merging. Current watermark: " + internalTimerService.currentWatermark() +
                                        " window: " + mergeResult);
                            } else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
                                throw new UnsupportedOperationException("The end timestamp of a " +
                                        "processing-time window cannot become earlier than the current processing time " +
                                        "by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
                                        " window: " + mergeResult);
                            }
    
                            triggerContext.key = key;
                            triggerContext.window = mergeResult;
    
                            triggerContext.onMerge(mergedWindows);
    
                            for (W m: mergedWindows) {
                                triggerContext.window = m;
                                triggerContext.clear();
                                deleteCleanupTimer(m);
                            }
    
                            // merge the merged state windows into the newly resulting state window
                            windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
                        }
                    });
    
                    // drop if the window is already late
                    if (isWindowLate(actualWindow)) {
                        mergingWindows.retireWindow(actualWindow);
                        continue;
                    }
                    isSkippedElement = false;
    
                    W stateWindow = mergingWindows.getStateWindow(actualWindow);
                    if (stateWindow == null) {
                        throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
                    }
    
                    windowState.setCurrentNamespace(stateWindow);
                    windowState.add(element.getValue());
    
                    triggerContext.key = key;
                    triggerContext.window = actualWindow;
    
                    TriggerResult triggerResult = triggerContext.onElement(element);
    
                    if (triggerResult.isFire()) {
                        ACC contents = windowState.get();
                        if (contents == null) {
                            continue;
                        }
                        emitWindowContents(actualWindow, contents);
                    }
    
                    if (triggerResult.isPurge()) {
                        windowState.clear();
                    }
                    registerCleanupTimer(actualWindow);
                }
    
                // need to make sure to update the merging state in state
                mergingWindows.persist();
            } else {
                for (W window: elementWindows) {
    
                    // drop if the window is already late
                    if (isWindowLate(window)) {
                        continue;
                    }
                    isSkippedElement = false;
    
                    windowState.setCurrentNamespace(window);
                    windowState.add(element.getValue());
    
                    triggerContext.key = key;
                    triggerContext.window = window;
    
                    TriggerResult triggerResult = triggerContext.onElement(element);
    
                    if (triggerResult.isFire()) {
                        ACC contents = windowState.get();
                        if (contents == null) {
                            continue;
                        }
                        emitWindowContents(window, contents);
                    }
    
                    if (triggerResult.isPurge()) {
                        windowState.clear();
                    }
                    registerCleanupTimer(window);
                }
            }
    
            // side output input event if
            // element not handled by any window
            // late arriving tag has been set
            // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
            if (isSkippedElement && isElementLate(element)) {
                if (lateDataOutputTag != null){
                    sideOutput(element);
                } else {
                    this.numLateRecordsDropped.inc();
                }
            }
        }
    
        protected boolean isElementLate(StreamRecord<IN> element){
            return (windowAssigner.isEventTime()) &&
                (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
        }
    
        private long cleanupTime(W window) {
            if (windowAssigner.isEventTime()) {
                long cleanupTime = window.maxTimestamp() + allowedLateness;
                return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
            } else {
                return window.maxTimestamp();
            }
        }
    
        //......
    }
    
    • WindowOperator里头有个isElementLate方法,根据allowedLateness来判断一个element是否late,其processElement方法最后在isSkippedElement为true而且isElementLate也为true的情况下会执行如下逻辑:在lateDataOutputTag不为null的情况下会将late的element输出到side output,如果lateDataOutputTag为null,则执行numLateRecordsDropped.inc()来递增numLateRecordsDropped统计数

    小结

    • 当使用event-time window的时候,flink提供了allowedLateness方法用来配置元素允许的迟到时间,超过该值则会被丢弃(在窗口结束时间+允许迟到时间内到达的元素仍然会被添加到窗口内),默认该参数设置为0;对于使用GlobalWindows这类window assigner,由于其end时间戳为Long.MAX_VALUE,因此element就无所谓late
    • OutputTag是一个带有名称及类型信息的side output标识;flink允许ProcessFunction、CoProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction这些function输出side output,这些function的Context有一个output(OutputTag<X> outputTag, X value)方法用于输出元素到side output
    • SingleOutputStreamOperator提供了getSideOutput方法,可以根据OutputTag来获取之前在function里头输出的site output;WindowOperator的processElement方法在最后会判断,如果isSkippedElement为true而且isElementLate也为true,则在lateDataOutputTag不为null的情况下会将late的element输出到side output

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的Allowed Lateness

          本文链接:https://www.haomeiwen.com/subject/xpgbrqtx.html