美文网首页
Flink 的window机制(二) 窗口函数

Flink 的window机制(二) 窗口函数

作者: 万事万物 | 来源:发表于2021-09-03 10:26 被阅读0次

    Window Function

    前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.

    window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.

    ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.

    ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素

    ReduceFunction(增量聚合函数)

    需求,统计一个班级的学生成绩
    为了统计方便,定义一个javabean

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @ToString
    public class Student {
        /**
         * 班级id
         */
        private Integer classId;
        /**
         * 学习id
         */
        private Integer stuId;
        /**
         * 成绩
         */
        private Double score;
    
    
        public Student(String[] args){
            this.classId=Integer.valueOf(args[0]);
            this.stuId=Integer.valueOf(args[1]);
            this.score=Double.valueOf(args[2]);
    
        }
    
    }
    
    • 程序
        @Test
        public void test1() throws Exception {
    
            Configuration config=new Configuration();
            config.setInteger("rest.port",8081); // 配置固定端口
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
    
    
            DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);
    
            // 转换成对象
            SingleOutputStreamOperator<Student> map = source.map((MapFunction<String, Student>) value -> new Student(value.split(",")));
    
            //设置窗口
            KeyedStream<Student, Integer> keyBy = map.keyBy(Student::getClassId);
    
            //使用 reduce进行聚合
            keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                    .reduce((ReduceFunction<Student>) (value1, value2) -> {
                        //聚合
                        value1.setScore(value1.getScore()+value2.getScore());
                        return value1;
                    }).print();
    
            env.execute();
        }
    
    • 输入
    1,1,90
    1,2,60
    
    • 输出
    11> Student(classId=1, stuId=1, score=150.0)
    
    • 使用sum的方式
      当然上面的方式,使用sum也可以进行计算
        @Test
        public void test2() throws Exception {
    
            Configuration config=new Configuration();
            config.setInteger("rest.port",8081); // 配置固定端口
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
    
    
            DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);
    
            // 转换成对象
            SingleOutputStreamOperator<Student> map = source.map((MapFunction<String, Student>) value -> new Student(value.split(",")));
    
            //设置窗口
            KeyedStream<Student, Integer> keyBy = map.keyBy(Student::getClassId);
    
            //使用 reduce进行聚合
            keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                    /*.reduce((ReduceFunction<Student>) (value1, value2) -> {
                        //聚合
                        value1.setScore(value1.getScore()+value2.getScore());
                        return value1;
                    }).print();*/
                .sum("score").print();
    
            env.execute();
        }
    

    结果

    11> Student(classId=1, stuId=1, score=150.0)
    

    使用sum也可以达到同样的目的,但是对比于reduce来说,它更加灵活。

    • 获取窗口信息
      reduce还有一些重载方法,可以用于获取窗口信息。
        @Test
        public void test3() throws Exception {
    
            Configuration config=new Configuration();
            config.setInteger("rest.port",8081); // 配置固定端口
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
    
    
            DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);
    
            // 转换成对象
            SingleOutputStreamOperator<Student> map = source.map((MapFunction<String, Student>) value -> new Student(value.split(",")));
    
            //设置窗口
            KeyedStream<Student, Integer> keyBy = map.keyBy(Student::getClassId);
    
            //使用 reduce进行聚合
            keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                    .reduce((ReduceFunction<Student>) (value1, value2) -> {
                        //聚合
                        value1.setScore(value1.getScore() + value2.getScore());
                        return value1;
                    }, new WindowFunction<Student, String, Integer, TimeWindow>() {
    
                        /**
                         *
                         * @param key class id
                         * @param window 窗口
                         * @param input 上一个窗口聚合的结果
                         * @param out 收集器
                         * @throws Exception
                         */
                        @Override
                        public void apply(Integer key, TimeWindow window, Iterable<Student> input, Collector<String> out) throws Exception {
    
                            Student next = input.iterator().next();
    
                            String msg=String.format("key=%s,window[%s,%s),input=%s",key, window.getStart(),window.getEnd(),next.toString());
                            out.collect(msg);
    
                        }
                    }).print();
    
            env.execute();
        }
    

    输出

    11> key=1,window[1628997085000,1628997090000),input=Student(classId=1, stuId=1, score=150.0)
    

    这样不仅可以获取聚合信息,也可以获取窗口信息。

    AggregateFunction(增量聚合函数)

     * @param <IN> 输入元素类型
     * @param <ACC> 累加器类型
     * @param <OUT> 输出类型
     */
    @PublicEvolving
    public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
    

    统计学生平均分

      @Test
        public void test3() throws Exception {
    
            Configuration config=new Configuration();
            config.setInteger("rest.port",8081); // 配置固定端口
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
    
    
            DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);
    
            // 转换成对象
            SingleOutputStreamOperator<Student> map = source.map((MapFunction<String, Student>) value -> new Student(value.split(",")));
    
            //设置窗口
            KeyedStream<Student, Integer> keyBy = map.keyBy(Student::getClassId);
    
            //使用 reduce进行聚合
            keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    
                    .aggregate(new org.apache.flink.api.common.functions.AggregateFunction<Student, Tuple2<Integer,Double>, Double>() {
    
                        /**
                         * 初始化累加器,只会调用一次。
                         * 这里计算学生个数,所以从零开始。
                         * @return
                         */
                        @Override
                        public Tuple2<Integer,Double> createAccumulator() {
                            // 初始化 学生个数,成绩
                            return Tuple2.of(0,0.0);
                        }
    
                        /**
                         * 元素个数累加
                         * (来一个学生就累计次数+1)
                         * @param value
                         * @param accumulator
                         * @return
                         */
                        @Override
                        public Tuple2<Integer,Double> add(Student value, Tuple2<Integer,Double> accumulator) {
                            // 学生个数+1
                            return Tuple2.of(accumulator.f0+1,value.getScore()+accumulator.f1);
                        }
    
                        /**
                         * 结果返回,计算平均分,所以是double类型
                         * @param accumulator
                         * @return
                         */
                        @Override
                        public Double getResult(Tuple2<Integer,Double> accumulator) {
                            return accumulator.f1/accumulator.f0;
                        }
    
                        /**
                         * 两个累加器合并,
                         * 当窗口是会话窗口才会生效
                         * @param a
                         * @param b
                         * @return
                         */
                        @Override
                        public Tuple2<Integer,Double> merge(Tuple2<Integer,Double> a, Tuple2<Integer,Double> b) {
                            return null;
                        }
                    }).print();
    
    
            env.execute();
        }
    
    • 输入
    1,1,90
    1,2,60
    
    • 输出
    11> 75.0
    
    • 获取窗口信息
      同样也有其他的重载方法,可以用于获取窗口信息。
    @PublicEvolving
    public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
          AggregateFunction<T, ACC, V> aggFunction, 
          WindowFunction<V, R, K, W> windowFunction
    ) {
    

    ProcessWindowFunction(全窗口函数)

    会将窗口中所有的元素都攒下来,等到窗口关闭的时候,一起来进行处理。这样就会造成内存占用过高,但是有些时候又不得不用,比如全局排序

    .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
        // 参数1: key 参数2: 上下文对象 参数3: 这个窗口内所有的元素 参数4: 收集器, 用于向下游传递数据
        @Override
        public void process(String key,
                            Context context,
                            Iterable<Tuple2<String, Long>> elements,
                            Collector<Tuple2<String, Long>> out) throws Exception {
            System.out.println(context.window().getStart());
            long sum = 0L;
            for (Tuple2<String, Long> t : elements) {
                sum += t.f1;
            }
            out.collect(Tuple2.of(key, sum));
        }
    })
    

    Iterable<Tuple2<String, Long>> elements, 存储着所有的元素。

    相关文章

      网友评论

          本文标题:Flink 的window机制(二) 窗口函数

          本文链接:https://www.haomeiwen.com/subject/sffubltx.html