美文网首页
聊聊flink的Session Window

聊聊flink的Session Window

作者: go4it | 来源:发表于2019-01-04 19:19 被阅读20次

    本文主要研究一下flink的Session Window

    MergingWindowAssigner

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/MergingWindowAssigner.java

    @PublicEvolving
    public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
        private static final long serialVersionUID = 1L;
    
        /**
         * Determines which windows (if any) should be merged.
         *
         * @param windows The window candidates.
         * @param callback A callback that can be invoked to signal which windows should be merged.
         */
        public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);
    
        /**
         * Callback to be used in {@link #mergeWindows(Collection, MergeCallback)} for specifying which
         * windows should be merged.
         */
        public interface MergeCallback<W> {
    
            /**
             * Specifies that the given windows should be merged into the result window.
             *
             * @param toBeMerged The list of windows that should be merged into one window.
             * @param mergeResult The resulting merged window.
             */
            void merge(Collection<W> toBeMerged, W mergeResult);
        }
    }
    
    • MergingWindowAssigner继承了WindowAssigner,它自己定义了mergeWindows抽象方法,该方法有一个MergeCallback类型参数,MergeCallback接口定义了merge方法,传入merge前的windows及合并后的window

    EventTimeSessionWindows

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java

    public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        protected long sessionTimeout;
    
        protected EventTimeSessionWindows(long sessionTimeout) {
            if (sessionTimeout <= 0) {
                throw new IllegalArgumentException("EventTimeSessionWindows parameters must satisfy 0 < size");
            }
    
            this.sessionTimeout = sessionTimeout;
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
        }
    
        @Override
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
    
        @Override
        public String toString() {
            return "EventTimeSessionWindows(" + sessionTimeout + ")";
        }
    
        /**
         * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
         * elements to sessions based on the element timestamp.
         *
         * @param size The session timeout, i.e. the time gap between sessions
         * @return The policy.
         */
        public static EventTimeSessionWindows withGap(Time size) {
            return new EventTimeSessionWindows(size.toMilliseconds());
        }
    
        /**
         * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
         * elements to sessions based on the element timestamp.
         *
         * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
         * @return The policy.
         */
        @PublicEvolving
        public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
            return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor);
        }
    
        @Override
        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new TimeWindow.Serializer();
        }
    
        @Override
        public boolean isEventTime() {
            return true;
        }
    
        /**
         * Merge overlapping {@link TimeWindow}s.
         */
        public void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
            TimeWindow.mergeWindows(windows, c);
        }
    
    }
    
    • EventTimeSessionWindows继承了MergingWindowAssigner,它的构造器参数为sessionTimeout;assignWindows方法返回的TimeWindow的start为timestamp,end为timestamp + sessionTimeout
    • getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer返回的是TimeWindow.Serializer();isEventTime返回的为true;mergeWindows方法调用的是TimeWindow.mergeWindows方法
    • EventTimeSessionWindows定义了两个静态工厂方法,分别是withGap及withDynamicGap,其中withGap创建的是EventTimeSessionWindows,withDynamicGap创建的是DynamicEventTimeSessionWindows

    ProcessingTimeSessionWindows

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java

    public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        protected long sessionTimeout;
    
        protected ProcessingTimeSessionWindows(long sessionTimeout) {
            if (sessionTimeout <= 0) {
                throw new IllegalArgumentException("ProcessingTimeSessionWindows parameters must satisfy 0 < size");
            }
    
            this.sessionTimeout = sessionTimeout;
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            long currentProcessingTime = context.getCurrentProcessingTime();
            return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
        }
    
        @Override
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return ProcessingTimeTrigger.create();
        }
    
        @Override
        public String toString() {
            return "ProcessingTimeSessionWindows(" + sessionTimeout + ")";
        }
    
        /**
         * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
         * elements to sessions based on the element timestamp.
         *
         * @param size The session timeout, i.e. the time gap between sessions
         * @return The policy.
         */
        public static ProcessingTimeSessionWindows withGap(Time size) {
            return new ProcessingTimeSessionWindows(size.toMilliseconds());
        }
    
        /**
         * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
         * elements to sessions based on the element timestamp.
         *
         * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
         * @return The policy.
         */
        @PublicEvolving
        public static <T> DynamicProcessingTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
            return new DynamicProcessingTimeSessionWindows<>(sessionWindowTimeGapExtractor);
        }
    
        @Override
        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new TimeWindow.Serializer();
        }
    
        @Override
        public boolean isEventTime() {
            return false;
        }
    
        /**
         * Merge overlapping {@link TimeWindow}s.
         */
        public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
            TimeWindow.mergeWindows(windows, c);
        }
    
    }
    
    • ProcessingTimeSessionWindows继承了MergingWindowAssigner,它的构造器参数为sessionTimeout;assignWindows方法返回的TimeWindow的start为currentProcessingTime(这里currentProcessingTime值为context.getCurrentProcessingTime()),end为currentProcessingTime + sessionTimeout
    • getDefaultTrigger方法返回的是ProcessingTimeTrigger;getWindowSerializer返回的是TimeWindow.Serializer();isEventTime返回的为false;mergeWindows方法调用的是TimeWindow.mergeWindows方法
    • ProcessingTimeSessionWindows定义了两个静态工厂方法,分别是withGap及withDynamicGap,其中withGap创建的是ProcessingTimeSessionWindows,withDynamicGap创建的是DynamicProcessingTimeSessionWindows

    SessionWindowTimeGapExtractor

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java

    @PublicEvolving
    public interface SessionWindowTimeGapExtractor<T> extends Serializable {
        /**
         * Extracts the session time gap.
         * @param element The input element.
         * @return The session time gap in milliseconds.
         */
        long extract(T element);
    }
    
    • SessionWindowTimeGapExtractor接口定义了extract方法,用于从element中提取sessionTimeout参数

    DynamicEventTimeSessionWindows

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java

    @PublicEvolving
    public class DynamicEventTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor;
    
        protected DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
            this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor;
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) {
            long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
            if (sessionTimeout <= 0) {
                throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
            }
            return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
        }
    
        @SuppressWarnings("unchecked")
        @Override
        public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return (Trigger<T, TimeWindow>) EventTimeTrigger.create();
        }
    
        @Override
        public String toString() {
            return "DynamicEventTimeSessionWindows()";
        }
    
        /**
         * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
         * elements to sessions based on the element timestamp.
         *
         * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
         * @return The policy.
         */
        public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
            return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor);
        }
    
        @Override
        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new TimeWindow.Serializer();
        }
    
        @Override
        public boolean isEventTime() {
            return true;
        }
    
        /**
         * Merge overlapping {@link TimeWindow}s.
         */
        public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
            TimeWindow.mergeWindows(windows, c);
        }
    
    }
    
    • DynamicEventTimeSessionWindows也继承了MergingWindowAssigner,与EventTimeSessionWindows不同的是,它的构造器参数为SessionWindowTimeGapExtractor
    • assignWindows方法首先使用sessionWindowTimeGapExtractor从element中提取sessionTimeout,然后返回TimeWindow(timestamp, timestamp + sessionTimeout);getDefaultTrigger方法返回的是EventTimeTrigger;isEventTime返回的为true;mergeWindows方法调用的是TimeWindow.mergeWindows方法
    • DynamicEventTimeSessionWindows定义了withDynamicGap的静态工厂方法,用于创建DynamicEventTimeSessionWindows

    DynamicProcessingTimeSessionWindows

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java

    @PublicEvolving
    public class DynamicProcessingTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor;
    
        protected DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
            this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor;
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) {
            long currentProcessingTime = context.getCurrentProcessingTime();
            long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
            if (sessionTimeout <= 0) {
                throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
            }
            return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
        }
    
        @SuppressWarnings("unchecked")
        @Override
        public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return (Trigger<T, TimeWindow>) ProcessingTimeTrigger.create();
        }
    
        @Override
        public String toString() {
            return "DynamicProcessingTimeSessionWindows()";
        }
    
        /**
         * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
         * elements to sessions based on the element timestamp.
         *
         * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
         * @return The policy.
         */
        public static <T> DynamicProcessingTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
            return new DynamicProcessingTimeSessionWindows<>(sessionWindowTimeGapExtractor);
        }
    
        @Override
        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new TimeWindow.Serializer();
        }
    
        @Override
        public boolean isEventTime() {
            return false;
        }
    
        /**
         * Merge overlapping {@link TimeWindow}s.
         */
        public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
            TimeWindow.mergeWindows(windows, c);
        }
    
    }
    
    • DynamicProcessingTimeSessionWindows也继承了MergingWindowAssigner,与ProcessingTimeSessionWindows不同的是,它的构造器参数为SessionWindowTimeGapExtractor
    • assignWindows方法首先使用sessionWindowTimeGapExtractor从element中提取sessionTimeout,然后返回TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout)(这里currentProcessingTime的值为context.getCurrentProcessingTime());getDefaultTrigger方法返回的是ProcessingTimeTrigger;isEventTime返回的为false;mergeWindows方法调用的是TimeWindow.mergeWindows方法
    • DynamicProcessingTimeSessionWindows定义了withDynamicGap的静态工厂方法,用于创建DynamicProcessingTimeSessionWindows

    小结

    • flink的session window主要有EventTimeSessionWindows、DynamicEventTimeSessionWindows、ProcessingTimeSessionWindows、DynamicProcessingTimeSessionWindows,它们都继承了MergingWindowAssigner;MergingWindowAssigner定义了mergeWindows抽象方法
    • EventTimeSessionWindows与ProcessingTimeSessionWindows的构造器参数都是sessionTimeout,不同的是,assignWindows中,ProcessingTimeSessionWindows使用context.getCurrentProcessingTime()替代了方法timestamp参数来计算TimeWindow;getDefaultTrigger方法前者返回EventTimeTrigger,后者返回ProcessingTimeTrigger;isEventTime方法前者返回true,后者返回false
    • DynamicEventTimeSessionWindows与DynamicProcessingTimeSessionWindows,它们与非dynamic的区别是,它们的构造器参数为SessionWindowTimeGapExtractor;SessionWindowTimeGapExtractor接口定义了extract方法,用于从element中提取sessionTimeout参数;而非dynamic的session window,其sessionTimeout参数在构造器传入之后就固定了

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的Session Window

          本文链接:https://www.haomeiwen.com/subject/foikrqtx.html