美文网首页玩转大数据
Flink 源码之Window

Flink 源码之Window

作者: AlienPaul | 来源:发表于2019-12-02 17:11 被阅读0次

    Flink源码分析系列文档目录

    请点击:Flink 源码分析系列文档目录

    前言

    本篇主要对Flink的window源码进行分析,包含:

    • Window类
    • GlobalWindow和TimeWindow
    • WindowAssigner
    • GlobalWindow
    • TumblingEventTimeWindows
    • SlidingEventTimeWindows
    • EventTimeSessionWindows
    • DynamicEventTimeSessionWindows
    • 以上window对应的processing time版本

    Window类

    Window在这里可理解为元素的一种分组方式。
    Window为一个抽象类,其中仅定义了一个方法maxTimestamp(),其意义为该window时间跨度所能包含的最大时间点(用时间戳表示)。
    Window类包含两个子类:GlobalWindow和TimeWindow。GlobalWindow是全局窗口,TimeWindow是具有起止时间的时间段窗口。

    Window的代码如下所示:

    public abstract class Window {
    
        /**
         * Gets the largest timestamp that still belongs to this window.
         *
         * @return The largest timestamp that still belongs to this window.
         */
        public abstract long maxTimestamp();
    }
    

    GlobalWindow

    第一种要介绍的window为GlobalWindow。顾名思义,GlobalWindow为一种全局的window。该window只存在一个实例(单例模式)。

    GlobalWindow的部分代码如下所示:

        private static final GlobalWindow INSTANCE = new GlobalWindow();
    
        private GlobalWindow() { }
    
            // 由此可知GlobalWindow为单例模式
        public static GlobalWindow get() {
            return INSTANCE;
        }
    
            // 由于GlobalWindow为单实例,同时根据GlobalWindow的定义,任何元素都属于GlobalWindow,故maxTimestamp返回Long的最大值
        @Override
        public long maxTimestamp() {
            return Long.MAX_VALUE;
        }
    

    TimeWindow

    和GlobalWindow不同的是,TimeWindow定义了明确的起止时间(start和end),TimeWindow是有明确时间跨度的。

    TimeWindow还定义了Window的计算方法,比如判断是否有交集,求并集等。代码如下所示:

    // 判断两个时间窗口是否有交集
    public boolean intersects(TimeWindow other) {
        return this.start <= other.end && this.end >= other.start;
    }
    
    /**
        * Returns the minimal window covers both this window and the given window.
        */
    // 返回两个window的并集
    public TimeWindow cover(TimeWindow other) {
        return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
    }
    

    Intersect方法示例:


    Window交集

    cover方法如图所示:


    cover方法

    GlobalWindow和TimeWindow的对比示例


    GlobalWindow和TimeWindow简单示意

    WindowAssigner

    对一个流进行window操作,元素以它们的key(keyBy函数指定)和它们所属的window进行分组。位于相同key和相同窗口的一组元素称之为窗格(pane)。

    在Flink中,window和window中的数据以key-value对应关系的形式存放(windowState,以HeapListState方式储存,在WindowOperator中定义,)。每次Flink接收到一个元素,会通过一定途径获取到包含该元素的window集合(assignWindows方法),将此元素加入到状态表中。
    举例来说,加入当前的状态表如下:

    (key为window,value为该window包含的元素)
    {w1: [v1, v2]}
    {w2: [v3]}
    {w3: [v4]}
    

    此时元素v5到来,包含元素v5的窗口为w1和w2,更新之后的状态表为:

    {w1: [v1, v2, v5]}
    {w2: [v3, v5]}
    {w3: [v4]}
    

    WindowAssigner的作用就是规定如何根据一个元素来获取包含它的窗口集合(assignWindows方法)。除此之外windowAssigner还包含窗口的触发机制(何时计算窗口内元素)和时间类型(event time或processing time)

    WindowAssigner源码如下:

    // T 为元素类型,W为窗口类型
    @PublicEvolving
    public abstract class WindowAssigner<T, W extends Window> implements Serializable {
        private static final long serialVersionUID = 1L;
    
        /**
         * Returns a {@code Collection} of windows that should be assigned to the element.
         *
         * @param element The element to which windows should be assigned.
         * @param timestamp The timestamp of the element.
         * @param context The {@link WindowAssignerContext} in which the assigner operates.
         */
        // 获取包含元素的window集合
        // 参数解释element: 进入窗口的元素。timestamp:元素的时间戳即event time。context:WindowAssigner上下文对象,需要用于携带processing time。
        public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
    
        /**
         * Returns the default trigger associated with this {@code WindowAssigner}.
         */
        // 获取默认的触发器
        public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
    
        /**
         * Returns a {@link TypeSerializer} for serializing windows that are assigned by
         * this {@code WindowAssigner}.
         */
        // 获取window的序列化器
        public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
    
        /**
         * Returns {@code true} if elements are assigned to windows based on event time,
         * {@code false} otherwise.
         */
        // 如果窗口是基于event time的,返回true。否则返回false
        public abstract boolean isEventTime();
    
        /**
         * A context provided to the {@link WindowAssigner} that allows it to query the
         * current processing time.
         *
         * <p>This is provided to the assigner by its containing
         * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
         * which, in turn, gets it from the containing
         * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
         */
        public abstract static class WindowAssignerContext {
    
            /**
             * Returns the current processing time.
             */
            // 获取当前的processing time。在使用processing time类型的window时候会使用此方法。
            public abstract long getCurrentProcessingTime();
    
        }
    }
    

    GlobalWindows

    此处的GlobalWindows实际上是一个GlobalWindow的分配器。负责为元素分配所属的GlobalWindow。
    GlobalWindows的一个应用场景为分配Count Window,即每累计够n个元素会触发计算的window。源码如下所示:

    public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
        // 使用CountTrigger,每隔size个元素,触发一次计算,同时又使用PurgingTrigger,每次触发计算之后将window内容清空
        return window(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
    }
    

    CountTrigger和PurgingTrigger的讲解参见Flink 源码之Trigger

    GlobalWindows的源码如下:

    @PublicEvolving
    public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
        private static final long serialVersionUID = 1L;
    
        private GlobalWindows() {}
    
        @Override
        public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            // 由于GlobalWindow为单例,故分配window时直接将GlobalWindow示例返回即可
            return Collections.singletonList(GlobalWindow.get());
        }
    
        @Override
        public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            // 默认不会触发任何计算
            return new NeverTrigger();
        }
    
        @Override
        public String toString() {
            return "GlobalWindows()";
        }
    
        /**
         * Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns
         * all elements to the same {@link GlobalWindow}.
         *
         * @return The global window policy.
         */
        public static GlobalWindows create() {
            return new GlobalWindows();
        }
    
        /**
         * A trigger that never fires, as default Trigger for GlobalWindows.
         */
        @Internal
         // 此处定义了NeverTrigger,该Trigger在任何情况下都返回TriggerResult.CONTINUE,不触发任何计算
        public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
            private static final long serialVersionUID = 1L;
    
            @Override
            public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
                return TriggerResult.CONTINUE;
            }
    
            @Override
            public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
                return TriggerResult.CONTINUE;
            }
    
            @Override
            public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
                return TriggerResult.CONTINUE;
            }
    
            @Override
            public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
    
            @Override
            public void onMerge(GlobalWindow window, OnMergeContext ctx) {
            }
        }
    
        @Override
        public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new GlobalWindow.Serializer();
        }
    
        @Override
        public boolean isEventTime() {
            return false;
        }
    }
    

    EventTime Windows

    EventTime window是基于事件时间的window。Event time为元素生成时候的时间,通常时间信息在元素内容中携带。例如:

    {
      "message": "Server memory is running low",
      "timestamp": 1280977330000
    }
    

    官网Event time的讲解链接:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_time.html

    TumblingEventTimeWindows

    Tumbling Window即翻滚窗口,它的特性为相邻的窗口之间没有重叠。一个元素只可能属于一个窗口。


    Tumbling Window

    TumblingEventTimeWindows 如下所示:

    @PublicEvolving
    public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        // 窗口大小
        private final long size;
    
        // 窗口偏移
        private final long offset;
    
        protected TumblingEventTimeWindows(long size, long offset) {
            // offset大小必须位于0和size之间
            if (offset < 0 || offset >= size) {
                throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
            }
    
            this.size = size;
            this.offset = offset;
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            if (timestamp > Long.MIN_VALUE) {
                // Long.MIN_VALUE is currently assigned when no timestamp is present
                // 获取到的start值为刚好不大于timestamp的size的整数倍,再加上offset和windowSize取模的值
                long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
                return Collections.singletonList(new TimeWindow(start, start + size));
            } else {
                throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                        "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                        "'DataStream.assignTimestampsAndWatermarks(...)'?");
            }
        }
    
        @Override
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
    
        @Override
        public String toString() {
            return "TumblingEventTimeWindows(" + size + ")";
        }
    
        /**
         * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
         * elements to time windows based on the element timestamp.
         *
         * @param size The size of the generated windows.
         * @return The time policy.
         */
        public static TumblingEventTimeWindows of(Time size) {
            return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
        }
    
        /**
         * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
         * elements to time windows based on the element timestamp and offset.
         *
         * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
         * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
         * time windows start at 0:15:00,1:15:00,2:15:00,etc.
         *
         * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
         * such as China which is using UTC+08:00,and you want a time window with size of one day,
         * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
         * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
         *
         * @param size The size of the generated windows.
         * @param offset The offset which window start would be shifted by.
         * @return The time policy.
         */
        public static TumblingEventTimeWindows of(Time size, Time offset) {
            return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
        }
    
        @Override
        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new TimeWindow.Serializer();
        }
    
        @Override
        public boolean isEventTime() {
            return true;
        }
    }
    

    SlidingEventTimeWindows

    Sliding Window即滑动窗口,sliding窗口具有三个属性,窗口长度(size),滑动距离(slide)和偏移量(offset)。滑动窗口是可以重叠的。这意味着一个元素可以同时属于多个窗口。如下图所示。


    Slide Window
    @PublicEvolving
    public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        // 窗口大小(跨度)
        private final long size;
    
        // 窗口滑动距离
        private final long slide;
    
        // 偏移量
        private final long offset;
    
        protected SlidingEventTimeWindows(long size, long slide, long offset) {
            // offset必须小于slide,并且size必须大于0
            if (Math.abs(offset) >= slide || size <= 0) {
                throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy " +
                    "abs(offset) < slide and size > 0");
            }
    
            this.size = size;
            this.slide = slide;
            this.offset = offset;
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            if (timestamp > Long.MIN_VALUE) {
                // 对于SlidingWindow,窗口之间会有重叠。在某一时间点会有size/slide个窗口包含它
                List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
                // 获取到的start值为刚好不大于timestamp的size的整数倍,再加上offset和windowSize取模的值
                long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
                for (long start = lastStart;
                    start > timestamp - size;
                    start -= slide) {
                    // 从startStart开始,每次减去一个slide。之间所有的窗口都会包含该元素,需要返回
                    // 循环的时候start必须要大于timestamp - size。如果start < timestamp - size,那么(start + size) 将会小于timestamp,即end < timestamp,窗口不可能包含这个元素
                    windows.add(new TimeWindow(start, start + size));
                }
                return windows;
            } else {
                // 忘记设置env使用event time或者没有指定timestamp字段,会导致timestamp为LONG.MIN_VALUE
                throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                        "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                        "'DataStream.assignTimestampsAndWatermarks(...)'?");
            }
        }
    
        public long getSize() {
            return size;
        }
    
        public long getSlide() {
            return slide;
        }
    
        // 默认使用event time trigger
        @Override
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
    
        @Override
        public String toString() {
            return "SlidingEventTimeWindows(" + size + ", " + slide + ")";
        }
    
        /**
         * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns
         * elements to sliding time windows based on the element timestamp.
         *
         * @param size The size of the generated windows.
         * @param slide The slide interval of the generated windows.
         * @return The time policy.
         */
        public static SlidingEventTimeWindows of(Time size, Time slide) {
            return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
        }
    
        /**
         * Creates a new {@code SlidingEventTimeWindows} {@link WindowAssigner} that assigns
         * elements to time windows based on the element timestamp and offset.
         *
         * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
         * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
         * time windows start at 0:15:00,1:15:00,2:15:00,etc.
         *
         * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
         * such as China which is using UTC+08:00,and you want a time window with size of one day,
         * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
         * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
         *
         * @param size The size of the generated windows.
         * @param slide  The slide interval of the generated windows.
         * @param offset The offset which window start would be shifted by.
         * @return The time policy.
         */
        public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
            return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
        }
    
        @Override
        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new TimeWindow.Serializer();
        }
    
        @Override
        public boolean isEventTime() {
            return true;
        }
    }
    

    MergingWindowAssigner

    MergingWindowAssigner为WindowAssigner的拓展。和WindowAssigner不同的是,MergingWindowAssigner支持窗口的合并操作。目前Flink的Session Window实现了MergingWindowAssigner,支持窗口的合并。根据Session Window的定义,如果相邻两个元素之间的时间差不超过窗口的gap,这两个元素就属于同一个窗口,由此可见session window的长度是随时变化的,不像是上述的两种window,size是固定的值。Session window随着时间的推移需要不断的合并,才能容纳下更多的元素。合并窗口的逻辑在mergeWindows方法中。该方法包含有一个MergeCallback对象,用于在合并窗口的时候给出通知,执行一些额外的逻辑。

    MergingWindowAssigner的源码如下:

    public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
        private static final long serialVersionUID = 1L;
    
        /**
         * Determines which windows (if any) should be merged.
         *
         * @param windows The window candidates.
         * @param callback A callback that can be invoked to signal which windows should be merged.
         */
        // 合并窗口的逻辑
        // 参数为:windows: 待合并的窗口集合,callback: 回调函数,用于通知将要被合并的window
        public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);
    
        /**
         * Callback to be used in {@link #mergeWindows(Collection, MergeCallback)} for specifying which
         * windows should be merged.
         */
        public interface MergeCallback<W> {
    
            /**
             * Specifies that the given windows should be merged into the result window.
             *
             * @param toBeMerged The list of windows that should be merged into one window.
             * @param mergeResult The resulting merged window.
             */
            // toBeMerged:将要被合并的window
            // mergeResult:合并结果窗口
            void merge(Collection<W> toBeMerged, W mergeResult);
        }
    }
    

    EventTimeSessionWindows

    Session Window具有一个属性:session timeout(或gap)。
    Session Window的特点为:同一个Session Window的元素,任意紧邻的两个之间的时间差不会超过window的gap值。如果新到来的元素和前一个元素的时间差小于gap,后来的元素会进入到之前元素的window中,否则,新的元素会进入到新的window。如下图所示(官网图片过小,放出官网链接如下https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html
    ):

    Session Window
    public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        // session超时时间,根据session window的定义,相邻的两个元素时间间隔如果超过session超时时间,后面新到来的window会进入新的窗口中
        protected long sessionTimeout;
    
        protected EventTimeSessionWindows(long sessionTimeout) {
            if (sessionTimeout <= 0) {
                throw new IllegalArgumentException("EventTimeSessionWindows parameters must satisfy 0 < size");
            }
    
            this.sessionTimeout = sessionTimeout;
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            //直接返回start为timestamp,end为timestamp + sessionTimeout的window,合并window的操作在WindowOperator中,会生成时间跨度覆盖满足session window定义元素的window。会在后续博客分析
            return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
        }
    
        @Override
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
    
        @Override
        public String toString() {
            return "EventTimeSessionWindows(" + sessionTimeout + ")";
        }
    
        /**
         * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
         * elements to sessions based on the element timestamp.
         *
         * @param size The session timeout, i.e. the time gap between sessions
         * @return The policy.
         */
        public static EventTimeSessionWindows withGap(Time size) {
            return new EventTimeSessionWindows(size.toMilliseconds());
        }
    
        /**
         * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
         * elements to sessions based on the element timestamp.
         *
         * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
         * @return The policy.
         */
         // 动态的EventTimeSessionWindow,gap可以随着元素到来调整的
        @PublicEvolving
        public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
            return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor);
        }
    
        @Override
        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new TimeWindow.Serializer();
        }
    
        @Override
        public boolean isEventTime() {
            return true;
        }
    
        /**
         * Merge overlapping {@link TimeWindow}s.
         */
         //合并重叠的window,方法稍后分析
        @Override
        public void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
            TimeWindow.mergeWindows(windows, c);
        }
    
    }
    

    TimeWindow.mergeWindows方法。该方法定义了时间窗口的合并逻辑。如下所示:

    public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
    
        // sort the windows by the start time and then merge overlapping windows
    
        List<TimeWindow> sortedWindows = new ArrayList<>(windows);
    
        // 依照各个window的起始时间对window进行排序
        Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
            @Override
            public int compare(TimeWindow o1, TimeWindow o2) {
                return Long.compare(o1.getStart(), o2.getStart());
            }
        });
    
        // 为(2)变量的集合类型,存放多组合并结果
        List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
        
        // (2)储存合并的窗口。Tuple的第一个参数含义为合并后的window,第二个参数意义为被合并的各个window
        Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;
    
        // 逐个合并窗口
        for (TimeWindow candidate: sortedWindows) {
            // 最开始合并的时候,currentMerge为null
            if (currentMerge == null) {
                currentMerge = new Tuple2<>();
                currentMerge.f0 = candidate;
                currentMerge.f1 = new HashSet<>();
                currentMerge.f1.add(candidate);
            // 如果目前已合并的window(currentMerge)和待合并的窗口(candidate),时间上有交集,则需要将这两个窗口时间合并(取并集),重新赋予Tuple2第一个参数,然后将被合并的窗口(candidate)加入到Tuple第二个参数的集合中
            } else if (currentMerge.f0.intersects(candidate)) {
                currentMerge.f0 = currentMerge.f0.cover(candidate);
                currentMerge.f1.add(candidate);
            // 这个分支处理待合并window和当前已合并的window没有交集的情况。
            } else {
                merged.add(currentMerge);
                currentMerge = new Tuple2<>();
                currentMerge.f0 = candidate;
                currentMerge.f1 = new HashSet<>();
                currentMerge.f1.add(candidate);
            }
        }
    
        // 合并操作结束后将currentMerge结果加入到merged集合
        if (currentMerge != null) {
            merged.add(currentMerge);
        }
    
        for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
            // 只有被合并的window个数多于一个的时候,才需要通知window合并
            if (m.f1.size() > 1) {
                // 通知各个被merge的Window
                // Window合并的结果在此处输出
                c.merge(m.f1, m.f0);
            }
        }
    }
    

    merged 变量的内容如图所示:


    merged变量

    动态session window(Dynamic session window)

    上述非Dynamic session window的session timeout是固定的,window一经创建不会再更改。dynamic session window和上述session window不同的是,它的session timeout值是动态可变的。动态session window内部维护了一个sessionWindowTimeGapExtractor对象,该对象定义了进入window的元素和session timeout值的函数关系。源码如下所示:

    public interface SessionWindowTimeGapExtractor<T> extends Serializable {
        /**
         * Extracts the session time gap.
         * @param element The input element.
         * @return The session time gap in milliseconds.
         */
         // 定义了进入window的元素和session timeout值的函数关系
        long extract(T element);
    }
    

    DynamicEventTimeSessionwindow的assignWindows方法如下所示。显然,每次assign window的时候都回去调用sessionWindowTimeGapExtractor.extract方法,询问session timeout时间。

    public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) {
        // 调用extractor获取sessionTimeout值,其余的逻辑和传统session window相同
        long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
        if (sessionTimeout <= 0) {
            throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
        }
        return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
    }
    

    基于Processing Time的window

    Processing Time的各类window处理方式和event time类似,只不过是assignWindows方法使用:

    long currentProcessingTime = context.getCurrentProcessingTime();
    

    方法来获取当前的processing time。除此之外其他的逻辑基本相同,不再赘述。

    参考资料

    https://flink.apache.org/news/2015/12/04/Introducing-windows.html

    相关文章

      网友评论

        本文标题:Flink 源码之Window

        本文链接:https://www.haomeiwen.com/subject/nlgzwctx.html