美文网首页
聊聊flink的Sliding Window

聊聊flink的Sliding Window

作者: go4it | 来源:发表于2019-01-03 10:35 被阅读39次

    本文主要研究一下flink的Sliding Window

    SlidingEventTimeWindows

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingEventTimeWindows.java

    @PublicEvolving
    public class SlidingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        private final long size;
    
        private final long slide;
    
        private final long offset;
    
        protected SlidingEventTimeWindows(long size, long slide, long offset) {
            if (offset < 0 || offset >= slide || size <= 0) {
                throw new IllegalArgumentException("SlidingEventTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
            }
    
            this.size = size;
            this.slide = slide;
            this.offset = offset;
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            if (timestamp > Long.MIN_VALUE) {
                List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
                long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
                for (long start = lastStart;
                    start > timestamp - size;
                    start -= slide) {
                    windows.add(new TimeWindow(start, start + size));
                }
                return windows;
            } else {
                throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
                        "Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
                        "'DataStream.assignTimestampsAndWatermarks(...)'?");
            }
        }
    
        public long getSize() {
            return size;
        }
    
        public long getSlide() {
            return slide;
        }
    
        @Override
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return EventTimeTrigger.create();
        }
    
        @Override
        public String toString() {
            return "SlidingEventTimeWindows(" + size + ", " + slide + ")";
        }
    
        public static SlidingEventTimeWindows of(Time size, Time slide) {
            return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
        }
    
        public static SlidingEventTimeWindows of(Time size, Time slide, Time offset) {
            return new SlidingEventTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
                offset.toMilliseconds() % slide.toMilliseconds());
        }
    
        @Override
        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new TimeWindow.Serializer();
        }
    
        @Override
        public boolean isEventTime() {
            return true;
        }
    }
    
    • SlidingEventTimeWindows继承了Window,其中元素类型为Object,而窗口类型为TimeWindow;它有三个参数,一个是size,一个是slide,一个是offset,其中offset必须大于等于0,offset必须大于slide,size必须大于0
    • assignWindows方法以slide作为size通过TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)计算lastStart,然后以为start + size > timestamp为循环条件,每次对start减去slide,挨个计算TimeWindow(start, start + size);getDefaultTrigger方法返回的是EventTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回的为true
    • SlidingEventTimeWindows提供了of静态工厂方法,可以指定size、slide及offset参数,它对于传入的offset参数转为毫秒然后与slide.toMilliseconds()取余作为最后的offset值

    SlidingProcessingTimeWindows

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java

    public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
        private static final long serialVersionUID = 1L;
    
        private final long size;
    
        private final long offset;
    
        private final long slide;
    
        private SlidingProcessingTimeWindows(long size, long slide, long offset) {
            if (offset < 0 || offset >= slide || size <= 0) {
                throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy 0 <= offset < slide and size > 0");
            }
    
            this.size = size;
            this.slide = slide;
            this.offset = offset;
        }
    
        @Override
        public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
            timestamp = context.getCurrentProcessingTime();
            List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
            long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
            for (long start = lastStart;
                start > timestamp - size;
                start -= slide) {
                windows.add(new TimeWindow(start, start + size));
            }
            return windows;
        }
    
        public long getSize() {
            return size;
        }
    
        public long getSlide() {
            return slide;
        }
    
        @Override
        public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
            return ProcessingTimeTrigger.create();
        }
    
        @Override
        public String toString() {
            return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")";
        }
    
        public static SlidingProcessingTimeWindows of(Time size, Time slide) {
            return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
        }
    
        public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {
            return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(),
                offset.toMilliseconds() % slide.toMilliseconds());
        }
    
        @Override
        public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
            return new TimeWindow.Serializer();
        }
    
        @Override
        public boolean isEventTime() {
            return false;
        }
    }
    
    • SlidingProcessingTimeWindows继承了Window,其中元素类型为Object,而窗口类型为TimeWindow;它有三个参数,一个是size,一个是slide,一个是offset,其中offset必须大于等于0,offset必须大于slide,size必须大于0
    • assignWindows方法以slide作为size通过TimeWindow.getWindowStartWithOffset(timestamp, offset, slide)计算lastStart(与SlidingEventTimeWindows不同的是SlidingProcessingTimeWindows的这个方法里头使用context.getCurrentProcessingTime()值重置了timestamp),然后以为start + size > timestamp为循环条件,每次对start减去slide,挨个计算TimeWindow(start, start + size);getDefaultTrigger方法返回的是ProcessingTimeTrigger;getWindowSerializer方法返回的是TimeWindow.Serializer();isEventTime返回的为false
    • SlidingEventTimeWindows提供了of静态工厂方法,可以指定size、slide及offset参数,它对于传入的offset参数转为毫秒然后与slide.toMilliseconds()取余作为最后的offset值

    小结

    • flink的Sliding Window分为SlidingEventTimeWindows及SlidingProcessingTimeWindows,它们都继承了WindowAssigner,其中元素类型为Object,而窗口类型为TimeWindow;它有三个参数,一个是size,一个是slide,一个是offset,其中offset必须大于等于0,offset必须大于slide,size必须大于0
    • WindowAssigner定义了assignWindows、getDefaultTrigger、getWindowSerializer、isEventTime这几个抽象方法,同时定义了抽象静态类WindowAssignerContext;它有两个泛型,其中T为元素类型,而W为窗口类型;SlidingEventTimeWindows及SlidingProcessingTimeWindows的窗口类型为TimeWindow,它有start及end属性,其中start为inclusive,而end为exclusive,maxTimestamp返回的是end-1,它还提供了mergeWindows及getWindowStartWithOffset静态方法;前者用于合并重叠的时间窗口,后者用于获取指定timestamp、offset、windowSize的window start
    • SlidingEventTimeWindows及SlidingProcessingTimeWindows的不同在于assignWindows、getDefaultTrigger、isEventTime方法;前者assignWindows使用的是参数中的timestamp,而后者使用的是context.getCurrentProcessingTime();前者的getDefaultTrigger返回的是EventTimeTrigger,而后者返回的是ProcessingTimeTrigger;前者isEventTime方法返回的为true,而后者返回的为false

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的Sliding Window

          本文链接:https://www.haomeiwen.com/subject/jjtfrqtx.html