美文网首页
Flink的Window窗口机制

Flink的Window窗口机制

作者: 羋学僧 | 来源:发表于2020-11-15 19:48 被阅读0次

    1.1 Window概述聚合事件

    (比如计数、求和)在流上的工作方式与批处理不同。比如,对流中的所有元素进行计数是不可能的,因为通常流是无限的(无界的)。所以,流上的聚合需要由 window 来划定范围,比如 “计算过去的5分钟” ,或者 “最后100个元素的和” 。window是一种可以把无限数据切割为有限数据块的手段。

    窗口可以是时间驱动的【Time Window】(比如:每30秒)或者数据驱动的【Count Window】(比如:每100个元素)。


    1.2 Window类型

    窗口通常被区分为不同的类型:

    • tumbling windows:滚动窗口【没有重叠】(time,count)
    • sliding windows:滑动窗口【有重叠】(time,count)
    • session windows:会话窗口 (time)
    • global windows: 没有窗口

    1.2.1 tumblingwindows:滚动窗口【没有重叠】

    1.2.2 slidingwindows:滑动窗口【有重叠】

    1.2.3 session windows需求:实时计算每个单词出现的次数,如果一个单词过了5秒就没出现过了,那么就输出这个单词。

    案例演示:见下方


    1.2.4 global windows

    案例见下方


    1.2.5 Window类型总结Keyed Window 和 Non Keyed Window

    /**
     * Non Keyed Window 和 Keyed Window
     */
    public class WindowType {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> dataStream = env.socketTextStream("localhost", 8888);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] fields = line.split(",");
                    for (String word : fields) {
                        collector.collect(Tuple2.of(word, 1));
                    }
                }
            });
    
            //Non keyed Stream
    //        AllWindowedStream<Tuple2<String, Integer>, TimeWindow> nonkeyedStream = stream.timeWindowAll(Time.seconds(3));
    //        nonkeyedStream.sum(1)
    //                .print();
    
            //Keyed Stream
            stream.keyBy(0)
                    .timeWindow(Time.seconds(3))
                    .sum(1)
                    .print();
    
            env.execute("word count");
    
        }
    }
    

    TimeWindow

    // Stream of (sensorId, carCnt )
    val vehicleCnts: DataStream[(Int, Int)] = ...
    
    val tumblingCnts: DataStream[ (Int, Int)] = vehicleCnts 
      // key stream by sensorId
      .keyBy(0)
      // tumbling time window of 1 minute length
      .timeWindow(Time.minutes(1))
      // compute sum over carCnt
      .sum(1)
    
    val slidingCnts: DataStream[ (Int, Int)] = vehicleCnts
      .keyBy(0)
    // sliding time window of 1 minute Length and 30 secs trigger interval 
      .timeWindow(Time.minutes(1), Time.seconds(30))
      .sum(1 )
    

    CountWindow

    // Stream of (sensorId, carCnt)
    val vehicleCnts: DataStream[(Int, Int)] = ...
    
    val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts 
      // key stream by sensorId
      .keyBy(0)
      // tumbling count window of 100 elements size
      .countWindow(100)
      // compute the carCnt sum
      . sum(1)
    
    val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
      .keyBy(0)
      // sliding count window of 100 elements size and 10 elements trigger interval 
      .countWindow(100, 10)
      .sum(1)
    

    示例代码

    /**
     * 5秒过去以后,该单词不出现就打印出来该单词
     */
    public class SessionWindowTest {
        public static void main(String[] args) throws  Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] fields = line.split(",");
                    for (String word : fields) {
                        collector.collect(Tuple2.of(word, 1));
                    }
                }
            });
    
            stream.keyBy(0)
                   // 会话窗口
                   .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
                   //滚动窗口,3秒运行一次
                   // .window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
                    //.timeWindow(Time.seconds(3))
                    //.countWindow(100)
                    //滑动窗口,1s  3s单词次数
                    //.window(SlidingProcessingTimeWindows.of(Time.seconds(3),Time.seconds(1)))
                    .sum(1)
                    .print();
            env.execute("SessionWindowTest");
        }
    }
    
    

    1.3 window操作

    Keyed Windows

    []里的是可选算子

    stream
      .keyBy(...)  <---keyedversusnon-keyedwindows
      .window(...)  <---required:"assigner"
      [.trigger(...)]  <---optional:"trigger"(elsedefaulttrigger)
      [.evictor(...)]  <---optional:"evictor"(elsenoevictor)
      [.allowedLateness(...)]  <---optional:"lateness"(elsezero)
      [.sideOutputLateData(...)]  <---optional:"outputtag"(elsenosideoutputforlatedata)
      .reduce/aggregate/fold/apply()  <---required:"function"
      [.getSideOutput(...)]  <---optional:"outputtag"
    

    Non-Keyed Windows

    stream
      .windowAll(...)  <---required:"assigner"
      [.trigger(...)]  <---optional:"trigger"(elsedefaulttrigger)
      [.evictor(...)]  <---optional:"evictor"(elsenoevictor)
      [.allowedLateness(...)]  <---optional:"lateness"(elsezero)
      [.sideOutputLateData(...)]  <---optional:"outputtag"(elsenosideoutputforlatedata)
      .reduce/aggregate/fold/apply()  <---required:"function"
      [.getSideOutput(...)]  <---optional:"outputtag"
    

    1.3.1 window function

    global window

    global window + trigger 一起配合才能使用
    需求:单词每出现三次统计一次

    /**
     * 单词每出现三次统计一次,统计最近三次的数据?
     */
    public class GlobalWindowTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] fields = line.split(",");
                    for (String word : fields) {
                        collector.collect(Tuple2.of(word, 1));
                    }
                }
            });
    
            stream.keyBy(0)
                  // .countWindow(3)
                    .window(GlobalWindows.create())
                   .trigger(CountTrigger.of(3))
                    .sum(1)
                    .print();
    
            env.execute("SessionWindowTest");
        }
    }
    


    对比countWindow
    /**
     * 单词每出现三次统计一次,统计最近三次的数据?
     */
    public class GlobalWindowTest {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] fields = line.split(",");
                    for (String word : fields) {
                        collector.collect(Tuple2.of(word, 1));
                    }
                }
            });
    
            stream.keyBy(0)
                  .countWindow(3)
                    //  .window(GlobalWindows.create())
                   // .trigger(CountTrigger.of(3))
                    .sum(1)
                    .print();
    
            env.execute("SessionWindowTest");
        }
    }
    

    1.3.2 Trigger

    需求:自定义一个CountWindow

    /**
     * 使用Trigger 自己实现一个类似CountWindow的效果
     */
    public class CountWindowWordCountByTrigger {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] fields = line.split(",");
                    for (String word : fields) {
                        collector.collect(Tuple2.of(word, 1));
                    }
                }
            });
            
            WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> keyedWindow = stream.keyBy(0)
                    .window(GlobalWindows.create())
                    .trigger(new MyCountTrigger(3));
            
    
                //可以看看里面的源码,跟我们写的很像
    //        WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> keyedWindow = stream.keyBy(0)
    //                .window(GlobalWindows.create())
     //               .trigger(CountTrigger.of(3));
    
    
            DataStream<Tuple2<String, Integer>> wordCounts = keyedWindow.sum(1);
    
            wordCounts.print().setParallelism(1);
    
            env.execute("Streaming WordCount");
        }
    
    
    
        private static class MyCountTrigger
                extends Trigger<Tuple2<String, Integer>, GlobalWindow> {
            // 表示指定的元素的最大的数量
            private long maxCount;
            
            // 用于存储每个 key 对应的 count 值
            private ReducingStateDescriptor<Long> stateDescriptor
                    = new ReducingStateDescriptor<Long>("count", new ReduceFunction<Long>() {
                @Override
                public Long reduce(Long aLong, Long t1) throws Exception {
                    return aLong + t1;
                }
            }, Long.class);
    
    
            public MyCountTrigger(long maxCount) {
                this.maxCount = maxCount;
            }
    
            /**
             *  当一个元素进入到一个 window 中的时候就会调用这个方法
             * @param element   元素
             * @param timestamp 进来的时间
             * @param window    元素所属的窗口
             * @param ctx 上下文
             * @return TriggerResult
             *      1. TriggerResult.CONTINUE :表示对 window 不做任何处理
             *      2. TriggerResult.FIRE :表示触发 window 的计算
             *      3. TriggerResult.PURGE :表示清除 window 中的所有数据
             *      4. TriggerResult.FIRE_AND_PURGE :表示先触发 window 计算,然后删除 window 中的数据
             * @throws Exception
             */
            @Override
            public TriggerResult onElement(Tuple2<String, Integer> element,
                                           long timestamp,
                                           GlobalWindow window,
                                           TriggerContext ctx) throws Exception {
                // 拿到当前 key 对应的 count 状态值
                ReducingState<Long> count = ctx.getPartitionedState(stateDescriptor);
                // count 累加 1
                count.add(1L);
                // 如果当前 key 的 count 值等于 maxCount
                if (count.get() == maxCount) {
                    count.clear();
                    // 触发 window 计算,删除数据
                    //清空整个窗口的数据
                    return TriggerResult.FIRE_AND_PURGE;
                }
                // 否则,对 window 不做任何的处理
                return TriggerResult.CONTINUE;
            }
    
            /**
             *
             * 我们一般也不用定时器,所以这两个方法里面是没有逻辑的
             * @param time
             * @param window
             * @param ctx
             * @return
             * @throws Exception
             */
            @Override
            public TriggerResult onProcessingTime(long time,
                                                  GlobalWindow window,
                                                  TriggerContext ctx) throws Exception {
                // 写基于 Processing Time 的定时器任务逻辑
                return TriggerResult.CONTINUE;
            }
    
            @Override
            public TriggerResult onEventTime(long time,
                                             GlobalWindow window,
                                             TriggerContext ctx) throws Exception {
                // 写基于 Event Time 的定时器任务逻辑
                return TriggerResult.CONTINUE;
            }
    
            @Override
            public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
                // 清除状态值
                ctx.getPartitionedState(stateDescriptor).clear();
            }
        }
    
    }
    

    1.3.3 Evictor

    需求:实现每隔2个单词,计算最近3个单词

    /**
     * 使用Evictor 自己实现一个类似CountWindow(3,2)的效果
     * 每隔2个单词计算最近3个单词
     */
    public class CountWindowWordCountByEvictor {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] fields = line.split(",");
                    for (String word : fields) {
                        collector.collect(Tuple2.of(word, 1));
                    }
                }
            });
    
            WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> keyedWindow = stream.keyBy(0)
    
                  .window(GlobalWindows.create())
                    .trigger(new MyCountTrigger(2))
                    .evictor(new MyCountEvictor(3));
    
            DataStream<Tuple2<String, Integer>> wordCounts = keyedWindow.sum(1);
    
            wordCounts.print().setParallelism(1);
    
            env.execute("Streaming WordCount");
        }
    
        private static class MyCountTrigger
                extends Trigger<Tuple2<String, Integer>, GlobalWindow> {
            // 表示指定的元素的最大的数量
            private long maxCount;
    
            // 用于存储每个 key 对应的 count 值
            private ReducingStateDescriptor<Long> stateDescriptor
                    = new ReducingStateDescriptor<Long>("count", new ReduceFunction<Long>() {
                @Override
                public Long reduce(Long aLong, Long t1) throws Exception {
                    return aLong + t1;
                }
            }, Long.class);
    
            public MyCountTrigger(long maxCount) {
                this.maxCount = maxCount;
            }
    
            /**
             *  当一个元素进入到一个 window 中的时候就会调用这个方法
             * @param element   元素
             * @param timestamp 进来的时间
             * @param window    元素所属的窗口
             * @param ctx 上下文
             * @return TriggerResult
             *      1. TriggerResult.CONTINUE :表示对 window 不做任何处理
             *      2. TriggerResult.FIRE :表示触发 window 的计算
             *      3. TriggerResult.PURGE :表示清除 window 中的所有数据
             *      4. TriggerResult.FIRE_AND_PURGE :表示先触发 window 计算,然后删除 window 中的数据
             * @throws Exception
             */
            @Override
            public TriggerResult onElement(Tuple2<String, Integer> element,
                                           long timestamp,
                                           GlobalWindow window,
                                           TriggerContext ctx) throws Exception {
                // 拿到当前 key 对应的 count 状态值
                ReducingState<Long> count = ctx.getPartitionedState(stateDescriptor);
                // count 累加 1
                count.add(1L);
                // 如果当前 key 的 count 值等于 maxCount
                if (count.get() == maxCount) {
                    count.clear();
                    // 触发 window 计算
                    return TriggerResult.FIRE;
                }
                // 否则,对 window 不做任何的处理
                return TriggerResult.CONTINUE;
            }
    
            @Override
            public TriggerResult onProcessingTime(long time,
                                                  GlobalWindow window,
                                                  TriggerContext ctx) throws Exception {
                // 写基于 Processing Time 的定时器任务逻辑
                return TriggerResult.CONTINUE;
            }
    
            @Override
            public TriggerResult onEventTime(long time,
                                             GlobalWindow window,
                                             TriggerContext ctx) throws Exception {
                // 写基于 Event Time 的定时器任务逻辑
                return TriggerResult.CONTINUE;
            }
    
            @Override
            public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
                // 清除状态值
                ctx.getPartitionedState(stateDescriptor).clear();
            }
        }
        private static class MyCountEvictor
                implements Evictor<Tuple2<String, Integer>, GlobalWindow> {
            // window 的大小
            private long windowCount;
    
            public MyCountEvictor(long windowCount) {
                this.windowCount = windowCount;
            }
    
            /**
             *  在 window 计算之前删除特定的数据
             * @param elements  window 中所有的元素
             * @param size  window 中所有元素的大小
             * @param window    window
             * @param evictorContext    上下文
             */
            @Override
            public void evictBefore(Iterable<TimestampedValue<Tuple2<String, Integer>>> elements,
                                    int size,
                                    GlobalWindow window,
                                    EvictorContext evictorContext) {
                if (size <= windowCount) {
                    return;
                } else {
                    int evictorCount = 0;
                    Iterator<TimestampedValue<Tuple2<String, Integer>>> iterator = elements.iterator();
                    while (iterator.hasNext()) {
                        iterator.next();
                        evictorCount++;
                        // 如果删除的数量小于当前的 window 大小减去规定的 window 的大小,就需要删除当前的元素
                        if (evictorCount > size - windowCount) {
                            break;
                        } else {
                            iterator.remove();
                        }
                    }
                }
            }
    
            /**
             *  在 window 计算之后删除特定的数据
             * @param elements  window 中所有的元素
             * @param size  window 中所有元素的大小
             * @param window    window
             * @param evictorContext    上下文
             */
            @Override
            public void evictAfter(Iterable<TimestampedValue<Tuple2<String, Integer>>> elements,
                                   int size, GlobalWindow window, EvictorContext evictorContext) {
    
            }
        }
    
        }
    
    

    1.3.4 window增量聚合

    窗口中每进入一条数据,就进行一次计算,等时间到了展示最后的结果
    常用的聚合算子

    reduce(reduceFunction)
    aggregate(aggregateFunction)
    sum()
    min()
    max()
    
    /**
     * 演示增量聚合
     */
    public class SocketDemoIncrAgg {
        public static void main(String[] args) throws  Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
            SingleOutputStreamOperator<Integer> intDStream =
                    dataStream.map(number -> Integer.valueOf(number));
    
            AllWindowedStream<Integer, TimeWindow> windowResult = intDStream.timeWindowAll(Time.seconds(5));
    
            windowResult.reduce(new ReduceFunction<Integer>() {
               @Override
               public Integer reduce(Integer last, Integer current) throws Exception {
                   System.out.println("执行逻辑"+last + "  "+current);
                   return last+current;
               }
           }).print();
    
    
            env.execute(SocketDemoIncrAgg.class.getSimpleName());
        }
    }
    
    

    1.3.5 window全量聚合等属于窗口的数据到齐,才开始进行聚合计算【可以实现对窗口内的数据进行排序等需求】

    apply(windowFunction)
    process(processWindowFunction)
    processWindowFunction比windowFunction提供了更多的上下文信息。类似于map和RichMap的关系
    
    /**
     * 全量计算
     */
    public class SocketDemoFullAgg {
        public static void main(String[] args) throws  Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    
            //kafka -> topci 10 -> 10
            DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
            SingleOutputStreamOperator<Integer> intDStream = dataStream.map(number -> Integer.valueOf(number));
            AllWindowedStream<Integer, TimeWindow> windowResult = intDStream.timeWindowAll(Time.seconds(5));
            windowResult.process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
                @Override
                public void process(Context context, Iterable<Integer> iterable,
                                    Collector<Integer> collector) throws Exception {
                    System.out.println("执行计算逻辑");
                    int count=0;
                    Iterator<Integer> numberiterator = iterable.iterator();
                    while (numberiterator.hasNext()){
                        Integer number = numberiterator.next();
                        count+=number;
                    }
                    collector.collect(count);
                }
            }).print();
    
            env.execute("socketDemoFullAgg");
        }
    }
    

    1.3.6 window join

    两个window之间可以进行join,join操作只支持三种类型的window:滚动窗口,滑动窗口,会话窗口
    使用方式:

    stream
      .join(otherStream)//两个流进行关联
      .where(<KeySelector>)//选择第一个流的key作为关联字段
      .equalTo(<KeySelector>)//选择第二个流的key作为关联字段
      .window(<WindowAssigner>)//设置窗口的类型
      .apply(<JoinFunction>)//对结果做操作
    

    相关文章

      网友评论

          本文标题:Flink的Window窗口机制

          本文链接:https://www.haomeiwen.com/subject/ahfobktx.html