美文网首页Flink入门到精通
Flink window窗口机制探究--以tumbling wi

Flink window窗口机制探究--以tumbling wi

作者: 卧雪听月 | 来源:发表于2019-06-25 20:59 被阅读0次

    零、序言

    本篇文章探究Flink Window窗口机制,首先介绍窗口机制使用的总纲,涉及的所有组件进行介绍,心中有一个大体的蓝图和认识。之后基于keyBy方法返回的Keyed Window入手,分析window方法,并依次进行WindowAssigner、Trigger类介绍。篇幅所限,计划在其他文章中继续介绍evictor、reduce/aggregate等聚合方法,以及allowedLateness方法等使用。

    一、背景&目标

    为了实现项目场景中自定义窗口功能,还是要先把目前Flink提供的窗口机制剖析一下,先从简单好理解的入手,以tumbling windows为例,sliding windows思路也相似。

    二、窗口机制

    考虑keyed Windows ,从官网介绍可以通过窗口机制整个使用方法,提纲挈领了解所涉及的组件,虽然有些组件使用的是时候是可以使用默认而不用指定。

    stream
           .keyBy(...)               <-  keyed versus non-keyed windows
           .window(...)              <-  required: "assigner"
          [.trigger(...)]            <-  optional: "trigger" (else default trigger)
          [.evictor(...)]            <-  optional: "evictor" (else no evictor)
          [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
          [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
           .reduce/aggregate/fold/apply()      <-  required: "function"
          [.getSideOutput(...)]      <-  optional: "output tag"
    
    组件 作用
    keyBy 数据流按照key分流
    window 需要传入WindowAssigner类,用来进行Event元素时间窗口分配。滚动窗口和session窗口一个Event对应一个时间窗口,滑动窗口一个Eevent可以对应多个时间窗口。
    trigger 用来决定触发针对特定时间窗口进行运算的window function执行。
    evictor 用来在trigger触发后、window function执行之前进行event过滤。
    allowedLatteness 允许event延迟时间。
    sideOutputLateData 设置迟到的event 标签
    getSideOutput 获取迟到event
    reduce/aggregate/fold/apply window function窗口计算函数,对时间窗口中的event 元素进行计算。

    先从最基础的WindowAssigner类开始,本篇重点以tumbling windows为例。

    三、探究剖析--溯源

    Flink1.7.2版本Java代码。

    一切都是始于stream.keyBy().window()
    stream.keyBy()返回的是一个DataStream类子类:KeyedStream类对象

    DataStream类的相关知识已经在通过Flink 程序模板来学习StreamExecutionEnvironment 、DataStream 、StreamTransformation类文章中探究过了。

    来看看KeyedStream类对象中:window()

    public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
            return new WindowedStream<>(this, assigner);
        }
    

    返回的是一个WindowedStream类:

    public class WindowedStream<T, K, W extends Window> {
    
        /** The keyed data stream that is windowed by this stream. */
        private final KeyedStream<T, K> input;
    
        /** The window assigner. */
        private final WindowAssigner<? super T, W> windowAssigner;
    
        /** The trigger that is used for window evaluation/emission. */
        private Trigger<? super T, ? super W> trigger;
    
        /** The evictor that is used for evicting elements before window evaluation. */
        private Evictor<? super T, ? super W> evictor;
    
        /** The user-specified allowed lateness. */
        private long allowedLateness = 0L;
        // 其他省略
        public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
            function = input.getExecutionEnvironment().clean(function);
            return apply(new InternalIterableWindowFunction<>(function), resultType, function);
        }
            
    

    第一反应,WindowedStream虽然名字叫WindowedStream但是他不是DataStream类(虽然源代码中也在datastream包中)!但是呢,他提供了一系列计算操作function,返回的可都是DataStream类的子类:SingleOutputStreamOperator类。

    之后呢,看到了WindowedStream的成员和方法,可以看到窗口机制的组件 windowAssigner、trigger、evicto和窗口计算函数都在,开心啊,按图索骥即可!

    根据实际运行时的dataflow来看,最终Flink拓扑会被转换为一个有一个包含算子的处理结构。Flink怎么把窗口机制所有的组件都调动起来呢?通过观察窗口计算函数返回值都是DataStream类,整个拓扑就串起来了,对应有就有相应的Transformation,也就有相应的operator(StreamOperator真正在底层处理一个一个元素的操作类)。WindowStream 的apply方法对应调用一个private apply方法:

        private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {
    
            final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
            KeySelector<T, K> keySel = input.getKeySelector();
    
            WindowOperator<K, T, Iterable<T>, R, W> operator;
    
            if (evictor != null) {
                @SuppressWarnings({"unchecked", "rawtypes"})
                TypeSerializer<StreamRecord<T>> streamRecordSerializer =
                        (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));
    
                ListStateDescriptor<StreamRecord<T>> stateDesc =
                        new ListStateDescriptor<>("window-contents", streamRecordSerializer);
    
                operator =
                    new EvictingWindowOperator<>(windowAssigner,
                        windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                        keySel,
                        input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                        stateDesc,
                        function,
                        trigger,
                        evictor,
                        allowedLateness,
                        lateDataOutputTag);
    
            } else {
                ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
                    input.getType().createSerializer(getExecutionEnvironment().getConfig()));
    
                operator =
                    new WindowOperator<>(windowAssigner,
                        windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                        keySel,
                        input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                        stateDesc,
                        function,
                        trigger,
                        allowedLateness,
                        lateDataOutputTag);
            }
    
            return input.transform(opName, resultType, operator);
        }
    

    全程都在构建operator啊! 最后通过 return input.transform(opName, resultType, operator); 也就是说还是串在 KeyedStream上的哦!所以说WindowStream看似是一个 Stream其实只是为了构建Window机制而提供的API,到真正Flink 运行的时候,所有在KeyedStream定义的时间窗口,最终都会因为window function的调用返回一个DataStream,一个新的 Transformation被创建,窗口中的各种组件 windowAssigner 、trigger、evictor都会被打包在EvictingWindowOperator或者WindowOperator传给这个Transformation,Transformation 为王啊!

    我们来看看Transformation的operator对象,以WindowOperator类(AbstractUdfStreamOperator的子类)为例,看看他的processElement方法,代码很长100多行,先看骨架:

        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
    
            final Collection<W> elementWindows = windowAssigner.assignWindows(
                element.getValue(), element.getTimestamp(), windowAssignerContext);
    
            //if element is handled by none of assigned elementWindows
            boolean isSkippedElement = true;
    
            final K key = this.<K>getKeyedStateBackend().getCurrentKey();
    
            if (windowAssigner instanceof MergingWindowAssigner) {
            // 代码块1,windowAssigner可以merge
            } else {
            // 代码块2,windowAssigner不可以merge
            }
            // side output input event if
            // element not handled by any window
            // late arriving tag has been set
            // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
            if (isSkippedElement && isElementLate(element)) {
                if (lateDataOutputTag != null){
                    sideOutput(element);
                } else {
                    this.numLateRecordsDropped.inc();
                }
            }
        }
    
    

    第一步,调用windowAssigner.assignWindows给当前event element分配Window。
    第二步,核心,会针对第一步window集合,一个window一个window一次处理,调用trigger.onElement方法,如果fire的话就会调用window function进行计算。
    第三部,处理side ouput,迟到的元素。

    四、探究剖析--WindowAssigner类

    Flink源码中,WindowAssigner类对应滚动窗口的类有TumblingEventTimeWindows和TumblingProcessingTimeWindows,我们以TumblingEventTimeWindows为例,二者区别主要是窗口时间使用Event Time还是Process Time。

    先看一下WindowAssigner类源码,可以看出主要包含四个抽象方法。

    /**
     *  WindowAssigner可以分配 0个或者多个 Windows 给 Event 元素.
     * @param <T> Event 元素类别.
     * @param <W> Window类别.
     */
    public abstract class WindowAssigner<T, W extends Window> implements Serializable {
        private static final long serialVersionUID = 1L;
    
        /**
         * 返回Event  element 应该被分配的 window的集合
         * @param timestamp :event 的时间戳.
         */
        public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
        /**
            * 返回WindowAssigner默认的trigger
         */
        public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
        /**
            * 返回Window 的 TypeSerializer
         */
        public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
           /**
            * 是否是 event time
         */
        public abstract boolean isEventTime();
          /**
            *其他省略
         */
    }
    
    
    方法名称 方法用途
    assignWindows 返回Event element 应该被分配的 window的集合
    getDefaultTrigger 返回WindowAssigner默认的trigger
    getWindowSerializer 返回Window 的 TypeSerializer
    isEventTime 是否是 event time

    接下来 我们来看 继承WindowAssigner类 的 TumblingEventTimeWindows类的具体实现。

    /**
     *   示例:keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)));
     */
    @PublicEvolving
    public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
      // 窗口大小
        private final long size;
      // 偏移量
        private final long offset;
    
        protected TumblingEventTimeWindows(long size, long offset) {
            if (offset < 0 || offset >= size) {
                throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
            }
            this.size = size;
            this.offset = offset;
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            if (timestamp > Long.MIN_VALUE) {
                            // 根据滚动窗口机制,按照当前timestamp,计算对应窗口的start时间,并返回对应窗口                
                long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
                return Collections.singletonList(new TimeWindow(start, start + size));
            } else {
                throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                        "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                        "'DataStream.assignTimestampsAndWatermarks(...)'?");
            }
        }
        @Override
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
    // 其他省略
        @Override
        public boolean isEventTime() {
            return true;
        }
    }
    
    

    我们可以看到,主要区别就是包含了size 和 offset两个变量,使得 assignWindows 实现的时候可以返回想要的翻滚窗口。另外上面代码可以看出getDefaultTrigger返回的是 EventTimeTrigger类。

    接下来我们看一下Trigger类。

    Trigger方法名称 方法用途
    onElement 每当有event element 被加到window中,会触发。结果返回事件元素对应的window是否可以进行window function计算。
    onProcessingTime timer 计时器触发调用,使用的是process time 。
    onEventTime 同上,使用的是 event time。
    canMerge 是否支持 窗口合并,如果返回true,必须实现onMerge方法
    onMerge 当多个window被WindowAssigner合并的时候的调用。
    clear 清理相关window 的state
    TriggerContext TriggerContext接口,给Trigger提供state 处理和注册Timer callback
    OnMergeContext TriggerContext子接口,onMerge方法使用,增加了mergePartitionedState方法。

    单独整理TriggerContext 接口方法

    TriggerContext方法名称 方法用途
    getCurrentProcessingTime 返回当前processing time
    getMetricGroup 返回MetricGroup类对象
    getCurrentWatermark 返回当前Watermark time
    registerProcessingTimeTimer 注册time callback ,一旦到达time,Trigger的onProcessingTime会被调用
    registerEventTimeTimer 同上,当watermark 达到time,会触发Trigger的onEventTime方法。
    deleteProcessingTimeTimer 删除指定时间的processing time trigger
    deleteEventTimeTimer 删除指定时间的event time trigger
    getPartitionedState 返回 State对象
    getKeyValueState 返回ValueState对象
    /**
     * @param <T> Event 元素类别.
     * @param <W> Window类别.
     */
    @PublicEvolving
    public abstract class Trigger<T, W extends Window> implements Serializable {
    
        private static final long serialVersionUID = -4104633972991191369L;
    
        public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
    
        public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
    
        public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
    
        public boolean canMerge() {
            return false;
        }
    
        public void onMerge(W window, OnMergeContext ctx) throws Exception {
            throw new UnsupportedOperationException("This trigger does not support merging.");
        }
    
        public abstract void clear(W window, TriggerContext ctx) throws Exception;
    
        // ------------------------------------------------------------------------
    
        /**
         * A context object that is given to {@link Trigger} methods to allow them to register timer
         * callbacks and deal with state.
         */
        public interface TriggerContext {
    
            long getCurrentProcessingTime();
    
            MetricGroup getMetricGroup();
    
            long getCurrentWatermark();
    
            void registerProcessingTimeTimer(long time);
    
            void registerEventTimeTimer(long time);
    
            void deleteProcessingTimeTimer(long time);
    
            void deleteEventTimeTimer(long time);
    
            <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
    
            <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
    
            <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
        }
    
        public interface OnMergeContext extends TriggerContext {
            <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
        }
    }
    

    接下来我们看继承Trigger的 EventTimeTrigger类的实现:

    /**
     * EventTime使用watermark,一旦 watermark 超过 the end of the window,EventTimeTrigger触发
     */
    @PublicEvolving
    public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        private EventTimeTrigger() {}
        /**
         * 判断window的最大时间戳是否小于目前的watermark,小于的话返回TriggerResult.FIRE,否则的话为这个window注册一个trimer,返回TriggerResult.CONTINUE。TriggerResult是个枚举类型
         */    
        @Override
        public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
            if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
                // if the watermark is already past the window fire immediately
                return TriggerResult.FIRE;
            } else {
                ctx.registerEventTimeTimer(window.maxTimestamp());
                return TriggerResult.CONTINUE;
            }
        }
    
        @Override
        public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
            return time == window.maxTimestamp() ?
                TriggerResult.FIRE :
                TriggerResult.CONTINUE;
        }
    
        @Override
        public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
            return TriggerResult.CONTINUE;
        }
    
        @Override
        public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
            ctx.deleteEventTimeTimer(window.maxTimestamp());
        }
    
        @Override
        public boolean canMerge() {
            return true;
        }
    
        @Override
        public void onMerge(TimeWindow window,
                OnMergeContext ctx) {
            long windowMaxTimestamp = window.maxTimestamp();
            if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
                ctx.registerEventTimeTimer(windowMaxTimestamp);
            }
        }
    
        public static EventTimeTrigger create() {
            return new EventTimeTrigger();
        }
    }
    

    顺便看一下枚举类TriggerResult:
    TriggerResult决定window将会发生什么,如window function是否会调用,或者window是否被丢弃。当然如果window里面没有任何数据,什么都不会发生。

    TriggerResult 值 解释
    CONTINUE 对于window来说什么都不会发生
    FIRE_AND_PURGE 触发window function ,而且 purge
    FIRE 触发window function, window不会被purged
    PURGE window会被丢弃,里面所有的元素都被清理
    public enum TriggerResult {
    
        CONTINUE(false, false),
    
        FIRE_AND_PURGE(true, true),
    
        FIRE(true, false),
    
        PURGE(false, true);
    
        private final boolean fire;
        private final boolean purge;
    
        TriggerResult(boolean fire, boolean purge) {
            this.purge = purge;
            this.fire = fire;
        }
    
        public boolean isFire() {
            return fire;
        }
    
        public boolean isPurge() {
            return purge;
        }
    }
    

    相关文章

      网友评论

        本文标题:Flink window窗口机制探究--以tumbling wi

        本文链接:https://www.haomeiwen.com/subject/bekpfctx.html