美文网首页
Flink的Window窗口机制

Flink的Window窗口机制

作者: 羋学僧 | 来源:发表于2020-11-15 19:48 被阅读0次

1.1 Window概述聚合事件

(比如计数、求和)在流上的工作方式与批处理不同。比如,对流中的所有元素进行计数是不可能的,因为通常流是无限的(无界的)。所以,流上的聚合需要由 window 来划定范围,比如 “计算过去的5分钟” ,或者 “最后100个元素的和” 。window是一种可以把无限数据切割为有限数据块的手段。

窗口可以是时间驱动的【Time Window】(比如:每30秒)或者数据驱动的【Count Window】(比如:每100个元素)。


1.2 Window类型

窗口通常被区分为不同的类型:

  • tumbling windows:滚动窗口【没有重叠】(time,count)
  • sliding windows:滑动窗口【有重叠】(time,count)
  • session windows:会话窗口 (time)
  • global windows: 没有窗口

1.2.1 tumblingwindows:滚动窗口【没有重叠】

1.2.2 slidingwindows:滑动窗口【有重叠】

1.2.3 session windows需求:实时计算每个单词出现的次数,如果一个单词过了5秒就没出现过了,那么就输出这个单词。

案例演示:见下方


1.2.4 global windows

案例见下方


1.2.5 Window类型总结Keyed Window 和 Non Keyed Window

/**
 * Non Keyed Window 和 Keyed Window
 */
public class WindowType {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStream = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        //Non keyed Stream
//        AllWindowedStream<Tuple2<String, Integer>, TimeWindow> nonkeyedStream = stream.timeWindowAll(Time.seconds(3));
//        nonkeyedStream.sum(1)
//                .print();

        //Keyed Stream
        stream.keyBy(0)
                .timeWindow(Time.seconds(3))
                .sum(1)
                .print();

        env.execute("word count");

    }
}

TimeWindow

// Stream of (sensorId, carCnt )
val vehicleCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[ (Int, Int)] = vehicleCnts 
  // key stream by sensorId
  .keyBy(0)
  // tumbling time window of 1 minute length
  .timeWindow(Time.minutes(1))
  // compute sum over carCnt
  .sum(1)

val slidingCnts: DataStream[ (Int, Int)] = vehicleCnts
  .keyBy(0)
// sliding time window of 1 minute Length and 30 secs trigger interval 
  .timeWindow(Time.minutes(1), Time.seconds(30))
  .sum(1 )

CountWindow

// Stream of (sensorId, carCnt)
val vehicleCnts: DataStream[(Int, Int)] = ...

val tumblingCnts: DataStream[(Int, Int)] = vehicleCnts 
  // key stream by sensorId
  .keyBy(0)
  // tumbling count window of 100 elements size
  .countWindow(100)
  // compute the carCnt sum
  . sum(1)

val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
  .keyBy(0)
  // sliding count window of 100 elements size and 10 elements trigger interval 
  .countWindow(100, 10)
  .sum(1)

示例代码

/**
 * 5秒过去以后,该单词不出现就打印出来该单词
 */
public class SessionWindowTest {
    public static void main(String[] args) throws  Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);

        SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        stream.keyBy(0)
               // 会话窗口
               .window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
               //滚动窗口,3秒运行一次
               // .window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
                //.timeWindow(Time.seconds(3))
                //.countWindow(100)
                //滑动窗口,1s  3s单词次数
                //.window(SlidingProcessingTimeWindows.of(Time.seconds(3),Time.seconds(1)))
                .sum(1)
                .print();
        env.execute("SessionWindowTest");
    }
}

1.3 window操作

Keyed Windows

[]里的是可选算子

stream
  .keyBy(...)  <---keyedversusnon-keyedwindows
  .window(...)  <---required:"assigner"
  [.trigger(...)]  <---optional:"trigger"(elsedefaulttrigger)
  [.evictor(...)]  <---optional:"evictor"(elsenoevictor)
  [.allowedLateness(...)]  <---optional:"lateness"(elsezero)
  [.sideOutputLateData(...)]  <---optional:"outputtag"(elsenosideoutputforlatedata)
  .reduce/aggregate/fold/apply()  <---required:"function"
  [.getSideOutput(...)]  <---optional:"outputtag"

Non-Keyed Windows

stream
  .windowAll(...)  <---required:"assigner"
  [.trigger(...)]  <---optional:"trigger"(elsedefaulttrigger)
  [.evictor(...)]  <---optional:"evictor"(elsenoevictor)
  [.allowedLateness(...)]  <---optional:"lateness"(elsezero)
  [.sideOutputLateData(...)]  <---optional:"outputtag"(elsenosideoutputforlatedata)
  .reduce/aggregate/fold/apply()  <---required:"function"
  [.getSideOutput(...)]  <---optional:"outputtag"

1.3.1 window function

global window

global window + trigger 一起配合才能使用
需求:单词每出现三次统计一次

/**
 * 单词每出现三次统计一次,统计最近三次的数据?
 */
public class GlobalWindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);

        SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        stream.keyBy(0)
              // .countWindow(3)
                .window(GlobalWindows.create())
               .trigger(CountTrigger.of(3))
                .sum(1)
                .print();

        env.execute("SessionWindowTest");
    }
}


对比countWindow
/**
 * 单词每出现三次统计一次,统计最近三次的数据?
 */
public class GlobalWindowTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);

        SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        stream.keyBy(0)
              .countWindow(3)
                //  .window(GlobalWindows.create())
               // .trigger(CountTrigger.of(3))
                .sum(1)
                .print();

        env.execute("SessionWindowTest");
    }
}

1.3.2 Trigger

需求:自定义一个CountWindow

/**
 * 使用Trigger 自己实现一个类似CountWindow的效果
 */
public class CountWindowWordCountByTrigger {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);

        SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });
        
        WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> keyedWindow = stream.keyBy(0)
                .window(GlobalWindows.create())
                .trigger(new MyCountTrigger(3));
        

            //可以看看里面的源码,跟我们写的很像
//        WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> keyedWindow = stream.keyBy(0)
//                .window(GlobalWindows.create())
 //               .trigger(CountTrigger.of(3));


        DataStream<Tuple2<String, Integer>> wordCounts = keyedWindow.sum(1);

        wordCounts.print().setParallelism(1);

        env.execute("Streaming WordCount");
    }



    private static class MyCountTrigger
            extends Trigger<Tuple2<String, Integer>, GlobalWindow> {
        // 表示指定的元素的最大的数量
        private long maxCount;
        
        // 用于存储每个 key 对应的 count 值
        private ReducingStateDescriptor<Long> stateDescriptor
                = new ReducingStateDescriptor<Long>("count", new ReduceFunction<Long>() {
            @Override
            public Long reduce(Long aLong, Long t1) throws Exception {
                return aLong + t1;
            }
        }, Long.class);


        public MyCountTrigger(long maxCount) {
            this.maxCount = maxCount;
        }

        /**
         *  当一个元素进入到一个 window 中的时候就会调用这个方法
         * @param element   元素
         * @param timestamp 进来的时间
         * @param window    元素所属的窗口
         * @param ctx 上下文
         * @return TriggerResult
         *      1. TriggerResult.CONTINUE :表示对 window 不做任何处理
         *      2. TriggerResult.FIRE :表示触发 window 的计算
         *      3. TriggerResult.PURGE :表示清除 window 中的所有数据
         *      4. TriggerResult.FIRE_AND_PURGE :表示先触发 window 计算,然后删除 window 中的数据
         * @throws Exception
         */
        @Override
        public TriggerResult onElement(Tuple2<String, Integer> element,
                                       long timestamp,
                                       GlobalWindow window,
                                       TriggerContext ctx) throws Exception {
            // 拿到当前 key 对应的 count 状态值
            ReducingState<Long> count = ctx.getPartitionedState(stateDescriptor);
            // count 累加 1
            count.add(1L);
            // 如果当前 key 的 count 值等于 maxCount
            if (count.get() == maxCount) {
                count.clear();
                // 触发 window 计算,删除数据
                //清空整个窗口的数据
                return TriggerResult.FIRE_AND_PURGE;
            }
            // 否则,对 window 不做任何的处理
            return TriggerResult.CONTINUE;
        }

        /**
         *
         * 我们一般也不用定时器,所以这两个方法里面是没有逻辑的
         * @param time
         * @param window
         * @param ctx
         * @return
         * @throws Exception
         */
        @Override
        public TriggerResult onProcessingTime(long time,
                                              GlobalWindow window,
                                              TriggerContext ctx) throws Exception {
            // 写基于 Processing Time 的定时器任务逻辑
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time,
                                         GlobalWindow window,
                                         TriggerContext ctx) throws Exception {
            // 写基于 Event Time 的定时器任务逻辑
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
            // 清除状态值
            ctx.getPartitionedState(stateDescriptor).clear();
        }
    }

}

1.3.3 Evictor

需求:实现每隔2个单词,计算最近3个单词

/**
 * 使用Evictor 自己实现一个类似CountWindow(3,2)的效果
 * 每隔2个单词计算最近3个单词
 */
public class CountWindowWordCountByEvictor {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);

        SingleOutputStreamOperator<Tuple2<String, Integer>> stream = dataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] fields = line.split(",");
                for (String word : fields) {
                    collector.collect(Tuple2.of(word, 1));
                }
            }
        });

        WindowedStream<Tuple2<String, Integer>, Tuple, GlobalWindow> keyedWindow = stream.keyBy(0)

              .window(GlobalWindows.create())
                .trigger(new MyCountTrigger(2))
                .evictor(new MyCountEvictor(3));

        DataStream<Tuple2<String, Integer>> wordCounts = keyedWindow.sum(1);

        wordCounts.print().setParallelism(1);

        env.execute("Streaming WordCount");
    }

    private static class MyCountTrigger
            extends Trigger<Tuple2<String, Integer>, GlobalWindow> {
        // 表示指定的元素的最大的数量
        private long maxCount;

        // 用于存储每个 key 对应的 count 值
        private ReducingStateDescriptor<Long> stateDescriptor
                = new ReducingStateDescriptor<Long>("count", new ReduceFunction<Long>() {
            @Override
            public Long reduce(Long aLong, Long t1) throws Exception {
                return aLong + t1;
            }
        }, Long.class);

        public MyCountTrigger(long maxCount) {
            this.maxCount = maxCount;
        }

        /**
         *  当一个元素进入到一个 window 中的时候就会调用这个方法
         * @param element   元素
         * @param timestamp 进来的时间
         * @param window    元素所属的窗口
         * @param ctx 上下文
         * @return TriggerResult
         *      1. TriggerResult.CONTINUE :表示对 window 不做任何处理
         *      2. TriggerResult.FIRE :表示触发 window 的计算
         *      3. TriggerResult.PURGE :表示清除 window 中的所有数据
         *      4. TriggerResult.FIRE_AND_PURGE :表示先触发 window 计算,然后删除 window 中的数据
         * @throws Exception
         */
        @Override
        public TriggerResult onElement(Tuple2<String, Integer> element,
                                       long timestamp,
                                       GlobalWindow window,
                                       TriggerContext ctx) throws Exception {
            // 拿到当前 key 对应的 count 状态值
            ReducingState<Long> count = ctx.getPartitionedState(stateDescriptor);
            // count 累加 1
            count.add(1L);
            // 如果当前 key 的 count 值等于 maxCount
            if (count.get() == maxCount) {
                count.clear();
                // 触发 window 计算
                return TriggerResult.FIRE;
            }
            // 否则,对 window 不做任何的处理
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onProcessingTime(long time,
                                              GlobalWindow window,
                                              TriggerContext ctx) throws Exception {
            // 写基于 Processing Time 的定时器任务逻辑
            return TriggerResult.CONTINUE;
        }

        @Override
        public TriggerResult onEventTime(long time,
                                         GlobalWindow window,
                                         TriggerContext ctx) throws Exception {
            // 写基于 Event Time 的定时器任务逻辑
            return TriggerResult.CONTINUE;
        }

        @Override
        public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
            // 清除状态值
            ctx.getPartitionedState(stateDescriptor).clear();
        }
    }
    private static class MyCountEvictor
            implements Evictor<Tuple2<String, Integer>, GlobalWindow> {
        // window 的大小
        private long windowCount;

        public MyCountEvictor(long windowCount) {
            this.windowCount = windowCount;
        }

        /**
         *  在 window 计算之前删除特定的数据
         * @param elements  window 中所有的元素
         * @param size  window 中所有元素的大小
         * @param window    window
         * @param evictorContext    上下文
         */
        @Override
        public void evictBefore(Iterable<TimestampedValue<Tuple2<String, Integer>>> elements,
                                int size,
                                GlobalWindow window,
                                EvictorContext evictorContext) {
            if (size <= windowCount) {
                return;
            } else {
                int evictorCount = 0;
                Iterator<TimestampedValue<Tuple2<String, Integer>>> iterator = elements.iterator();
                while (iterator.hasNext()) {
                    iterator.next();
                    evictorCount++;
                    // 如果删除的数量小于当前的 window 大小减去规定的 window 的大小,就需要删除当前的元素
                    if (evictorCount > size - windowCount) {
                        break;
                    } else {
                        iterator.remove();
                    }
                }
            }
        }

        /**
         *  在 window 计算之后删除特定的数据
         * @param elements  window 中所有的元素
         * @param size  window 中所有元素的大小
         * @param window    window
         * @param evictorContext    上下文
         */
        @Override
        public void evictAfter(Iterable<TimestampedValue<Tuple2<String, Integer>>> elements,
                               int size, GlobalWindow window, EvictorContext evictorContext) {

        }
    }

    }


1.3.4 window增量聚合

窗口中每进入一条数据,就进行一次计算,等时间到了展示最后的结果
常用的聚合算子

reduce(reduceFunction)
aggregate(aggregateFunction)
sum()
min()
max()
/**
 * 演示增量聚合
 */
public class SocketDemoIncrAgg {
    public static void main(String[] args) throws  Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
        SingleOutputStreamOperator<Integer> intDStream =
                dataStream.map(number -> Integer.valueOf(number));

        AllWindowedStream<Integer, TimeWindow> windowResult = intDStream.timeWindowAll(Time.seconds(5));

        windowResult.reduce(new ReduceFunction<Integer>() {
           @Override
           public Integer reduce(Integer last, Integer current) throws Exception {
               System.out.println("执行逻辑"+last + "  "+current);
               return last+current;
           }
       }).print();


        env.execute(SocketDemoIncrAgg.class.getSimpleName());
    }
}


1.3.5 window全量聚合等属于窗口的数据到齐,才开始进行聚合计算【可以实现对窗口内的数据进行排序等需求】

apply(windowFunction)
process(processWindowFunction)
processWindowFunction比windowFunction提供了更多的上下文信息。类似于map和RichMap的关系
/**
 * 全量计算
 */
public class SocketDemoFullAgg {
    public static void main(String[] args) throws  Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


        //kafka -> topci 10 -> 10
        DataStreamSource<String> dataStream = env.socketTextStream("bigdata02", 1234);
        SingleOutputStreamOperator<Integer> intDStream = dataStream.map(number -> Integer.valueOf(number));
        AllWindowedStream<Integer, TimeWindow> windowResult = intDStream.timeWindowAll(Time.seconds(5));
        windowResult.process(new ProcessAllWindowFunction<Integer, Integer, TimeWindow>() {
            @Override
            public void process(Context context, Iterable<Integer> iterable,
                                Collector<Integer> collector) throws Exception {
                System.out.println("执行计算逻辑");
                int count=0;
                Iterator<Integer> numberiterator = iterable.iterator();
                while (numberiterator.hasNext()){
                    Integer number = numberiterator.next();
                    count+=number;
                }
                collector.collect(count);
            }
        }).print();

        env.execute("socketDemoFullAgg");
    }
}

1.3.6 window join

两个window之间可以进行join,join操作只支持三种类型的window:滚动窗口,滑动窗口,会话窗口
使用方式:

stream
  .join(otherStream)//两个流进行关联
  .where(<KeySelector>)//选择第一个流的key作为关联字段
  .equalTo(<KeySelector>)//选择第二个流的key作为关联字段
  .window(<WindowAssigner>)//设置窗口的类型
  .apply(<JoinFunction>)//对结果做操作

相关文章

网友评论

      本文标题:Flink的Window窗口机制

      本文链接:https://www.haomeiwen.com/subject/ahfobktx.html