美文网首页
聊聊flink的Tumbling Window

聊聊flink的Tumbling Window

作者: go4it | 来源:发表于2019-01-02 10:54 被阅读39次

    本文主要研究一下flink的Tumbling Window

    WindowAssigner

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java

    @PublicEvolving
    public abstract class WindowAssigner<T, W extends Window> implements Serializable {
        private static final long serialVersionUID = 1L;
    
        /**
         * Returns a {@code Collection} of windows that should be assigned to the element.
         *
         * @param element The element to which windows should be assigned.
         * @param timestamp The timestamp of the element.
         * @param context The {@link WindowAssignerContext} in which the assigner operates.
         */
        public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
    
        /**
         * Returns the default trigger associated with this {@code WindowAssigner}.
         */
        public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
    
        /**
         * Returns a {@link TypeSerializer} for serializing windows that are assigned by
         * this {@code WindowAssigner}.
         */
        public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
    
        /**
         * Returns {@code true} if elements are assigned to windows based on event time,
         * {@code false} otherwise.
         */
        public abstract boolean isEventTime();
    
        /**
         * A context provided to the {@link WindowAssigner} that allows it to query the
         * current processing time.
         *
         * <p>This is provided to the assigner by its containing
         * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator},
         * which, in turn, gets it from the containing
         * {@link org.apache.flink.streaming.runtime.tasks.StreamTask}.
         */
        public abstract static class WindowAssignerContext {
    
            /**
             * Returns the current processing time.
             */
            public abstract long getCurrentProcessingTime();
    
        }
    }
    
    • WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型

    Window

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/Window.java

    @PublicEvolving
    public abstract class Window {
    
        /**
         * Gets the largest timestamp that still belongs to this window.
         *
         * @return The largest timestamp that still belongs to this window.
         */
        public abstract long maxTimestamp();
    }
    
    • Window对象代表把无限流数据划分为有限buckets的集合,它有一个maxTimestamp,代表该窗口数据在该时间点内到达;它有两个子类,一个是GlobalWindow,一个是TimeWindow

    TimeWindow

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java

    @PublicEvolving
    public class TimeWindow extends Window {
    
        private final long start;
        private final long end;
    
        public TimeWindow(long start, long end) {
            this.start = start;
            this.end = end;
        }
    
        /**
         * Gets the starting timestamp of the window. This is the first timestamp that belongs
         * to this window.
         *
         * @return The starting timestamp of this window.
         */
        public long getStart() {
            return start;
        }
    
        /**
         * Gets the end timestamp of this window. The end timestamp is exclusive, meaning it
         * is the first timestamp that does not belong to this window any more.
         *
         * @return The exclusive end timestamp of this window.
         */
        public long getEnd() {
            return end;
        }
    
        /**
         * Gets the largest timestamp that still belongs to this window.
         *
         * <p>This timestamp is identical to {@code getEnd() - 1}.
         *
         * @return The largest timestamp that still belongs to this window.
         *
         * @see #getEnd()
         */
        @Override
        public long maxTimestamp() {
            return end - 1;
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
    
            TimeWindow window = (TimeWindow) o;
    
            return end == window.end && start == window.start;
        }
    
        @Override
        public int hashCode() {
            return MathUtils.longToIntWithBitMixing(start + end);
        }
    
        @Override
        public String toString() {
            return "TimeWindow{" +
                    "start=" + start +
                    ", end=" + end +
                    '}';
        }
    
        /**
         * Returns {@code true} if this window intersects the given window.
         */
        public boolean intersects(TimeWindow other) {
            return this.start <= other.end && this.end >= other.start;
        }
    
        /**
         * Returns the minimal window covers both this window and the given window.
         */
        public TimeWindow cover(TimeWindow other) {
            return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
        }
    
        // ------------------------------------------------------------------------
        // Serializer
        // ------------------------------------------------------------------------
    
        //......
    
        // ------------------------------------------------------------------------
        //  Utilities
        // ------------------------------------------------------------------------
    
        /**
         * Merge overlapping {@link TimeWindow}s. For use by merging
         * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}.
         */
        public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
    
            // sort the windows by the start time and then merge overlapping windows
    
            List<TimeWindow> sortedWindows = new ArrayList<>(windows);
    
            Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
                @Override
                public int compare(TimeWindow o1, TimeWindow o2) {
                    return Long.compare(o1.getStart(), o2.getStart());
                }
            });
    
            List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
            Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;
    
            for (TimeWindow candidate: sortedWindows) {
                if (currentMerge == null) {
                    currentMerge = new Tuple2<>();
                    currentMerge.f0 = candidate;
                    currentMerge.f1 = new HashSet<>();
                    currentMerge.f1.add(candidate);
                } else if (currentMerge.f0.intersects(candidate)) {
                    currentMerge.f0 = currentMerge.f0.cover(candidate);
                    currentMerge.f1.add(candidate);
                } else {
                    merged.add(currentMerge);
                    currentMerge = new Tuple2<>();
                    currentMerge.f0 = candidate;
                    currentMerge.f1 = new HashSet<>();
                    currentMerge.f1.add(candidate);
                }
            }
    
            if (currentMerge != null) {
                merged.add(currentMerge);
            }
    
            for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
                if (m.f1.size() > 1) {
                    c.merge(m.f1, m.f0);
                }
            }
        }
    
        /**
         * Method to get the window start for a timestamp.
         *
         * @param timestamp epoch millisecond to get the window start.
         * @param offset The offset which window start would be shifted by.
         * @param windowSize The size of the generated windows.
         * @return window start
         */
        public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
            return timestamp - (timestamp - offset + windowSize) % windowSize;
        }
    }
    
    • TimeWindow有start及end属性,其中start为inclusive,而end为exclusive,所以maxTimestamp返回的是end-1;这里重写了equals及hashcode方法
    • TimeWindow提供了intersects方法用于表示本窗口与指定窗口是否有交叉;而cover方法用于返回本窗口与指定窗口的重叠窗口
    • TimeWindow还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window start

    TumblingEventTimeWindows

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingEventTimeWindows.java

    @PublicEvolving
    public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        private final long size;
    
        private final long offset;
    
        protected TumblingEventTimeWindows(long size, long offset) {
            if (offset < 0 || offset >= size) {
                throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy 0 <= offset < size");
            }
    
            this.size = size;
            this.offset = offset;
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            if (timestamp > Long.MIN_VALUE) {
                // Long.MIN_VALUE is currently assigned when no timestamp is present
                long start = TimeWindow.getWindowStartWithOffset(timestamp, offset, size);
                return Collections.singletonList(new TimeWindow(start, start + size));
            } else {
                throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                        "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                        "'DataStream.assignTimestampsAndWatermarks(...)'?");
            }
        }
    
        @Override
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
    
        @Override
        public String toString() {
            return "TumblingEventTimeWindows(" + size + ")";
        }
    
        public static TumblingEventTimeWindows of(Time size) {
            return new TumblingEventTimeWindows(size.toMilliseconds(), 0);
        }
    
        public static TumblingEventTimeWindows of(Time size, Time offset) {
            return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
        }
    
        @Override
        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new TimeWindow.Serializer();
        }
    
        @Override
        public boolean isEventTime() {
            return true;
        }
    }
    
    • TumblingEventTimeWindows继承了Window,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offset
    • assignWindows方法获取的窗口为start及start+size,而start=TimeWindow.getWindowStartWithOffset(timestamp, offset, size);getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回true
    • TumblingEventTimeWindows提供了of静态工厂方法,可以指定size及offset参数

    TumblingProcessingTimeWindows

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java

    public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        private final long size;
    
        private final long offset;
    
        private TumblingProcessingTimeWindows(long size, long offset) {
            if (offset < 0 || offset >= size) {
                throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy  0 <= offset < size");
            }
    
            this.size = size;
            this.offset = offset;
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            final long now = context.getCurrentProcessingTime();
            long start = TimeWindow.getWindowStartWithOffset(now, offset, size);
            return Collections.singletonList(new TimeWindow(start, start + size));
        }
    
        public long getSize() {
            return size;
        }
    
        @Override
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return ProcessingTimeTrigger.create();
        }
    
        @Override
        public String toString() {
            return "TumblingProcessingTimeWindows(" + size + ")";
        }
    
        public static TumblingProcessingTimeWindows of(Time size) {
            return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0);
        }
    
        public static TumblingProcessingTimeWindows of(Time size, Time offset) {
            return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
        }
    
        @Override
        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new TimeWindow.Serializer();
        }
    
        @Override
        public boolean isEventTime() {
            return false;
        }
    }
    
    • TumblingProcessingTimeWindows继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offset
    • assignWindows方法获取的窗口为start及start+size,而start=TimeWindow.getWindowStartWithOffset(now, offset, size),而now值则为context.getCurrentProcessingTime(),则是与TumblingEventTimeWindows的不同之处,TumblingProcessingTimeWindows不使用timestamp参数来计算,它使用now值替代;getDefaultTrigger方法返回的是ProcessingTimeTrigger,而isEventTime方法返回的为false
    • TumblingProcessingTimeWindows也提供了of静态工厂方法,可以指定size及offset参数

    小结

    • flink的Tumbling Window分为TumblingEventTimeWindows及TumblingProcessingTimeWindows,它们都继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有两个参数,一个是size,一个是offset,其中offset必须大于等于0,size必须大于offset
    • WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型;TumblingEventTimeWindows及TumblingProcessingTimeWindows的窗口类型为TimeWindow,它有start及end属性,其中start为inclusive,而end为exclusive,maxTimestamp返回的是end-1,它还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window start
    • TumblingEventTimeWindows及TumblingProcessingTimeWindows的不同在于assignWindows、getDefaultTrigger、isEventTime方法;前者assignWindows使用的是参数中的timestamp,而后者使用的是now值;前者的getDefaultTrigger返回的是EventTimeTrigger,而后者返回的是ProcessingTimeTrigger;前者isEventTime方法返回的为true,而后者返回的为false

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的Tumbling Window

          本文链接:https://www.haomeiwen.com/subject/gpoalqtx.html