Window Function
前面指定了窗口的分配器, 接着我们需要来指定如何计算, 这事由window function来负责. 一旦窗口关闭, window function 去计算处理窗口中的每个元素.
window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一种.
ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以对到来的元素进行增量聚合 . ProcessWindowFunction 可以得到一个包含这个窗口中所有元素的迭代器, 以及这些元素所属窗口的一些元数据信息.
ProcessWindowFunction不能被高效执行的原因是Flink在执行这个函数之前, 需要在内部缓存这个窗口上所有的元素。
ReduceFunction(增量聚合函数)
需求,统计一个班级的学生成绩
为了统计方便,定义一个javabean
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class Student {
/**
* 班级id
*/
private Integer classId;
/**
* 学习id
*/
private Integer stuId;
/**
* 成绩
*/
private Double score;
public Student(String[] args){
this.classId=Integer.valueOf(args[0]);
this.stuId=Integer.valueOf(args[1]);
this.score=Double.valueOf(args[2]);
}
}
- 程序
@Test
public void test1() throws Exception {
Configuration config=new Configuration();
config.setInteger("rest.port",8081); // 配置固定端口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);
// 转换成对象
SingleOutputStreamOperator<Student> map = source.map((MapFunction<String, Student>) value -> new Student(value.split(",")));
//设置窗口
KeyedStream<Student, Integer> keyBy = map.keyBy(Student::getClassId);
//使用 reduce进行聚合
keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((ReduceFunction<Student>) (value1, value2) -> {
//聚合
value1.setScore(value1.getScore()+value2.getScore());
return value1;
}).print();
env.execute();
}
- 输入
1,1,90
1,2,60
- 输出
11> Student(classId=1, stuId=1, score=150.0)
- 使用sum的方式
当然上面的方式,使用sum也可以进行计算
@Test
public void test2() throws Exception {
Configuration config=new Configuration();
config.setInteger("rest.port",8081); // 配置固定端口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);
// 转换成对象
SingleOutputStreamOperator<Student> map = source.map((MapFunction<String, Student>) value -> new Student(value.split(",")));
//设置窗口
KeyedStream<Student, Integer> keyBy = map.keyBy(Student::getClassId);
//使用 reduce进行聚合
keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
/*.reduce((ReduceFunction<Student>) (value1, value2) -> {
//聚合
value1.setScore(value1.getScore()+value2.getScore());
return value1;
}).print();*/
.sum("score").print();
env.execute();
}
结果
11> Student(classId=1, stuId=1, score=150.0)
使用sum
也可以达到同样的目的,但是对比于reduce
来说,它更加灵活。
- 获取窗口信息
reduce还有一些重载方法,可以用于获取窗口信息。
@Test
public void test3() throws Exception {
Configuration config=new Configuration();
config.setInteger("rest.port",8081); // 配置固定端口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);
// 转换成对象
SingleOutputStreamOperator<Student> map = source.map((MapFunction<String, Student>) value -> new Student(value.split(",")));
//设置窗口
KeyedStream<Student, Integer> keyBy = map.keyBy(Student::getClassId);
//使用 reduce进行聚合
keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((ReduceFunction<Student>) (value1, value2) -> {
//聚合
value1.setScore(value1.getScore() + value2.getScore());
return value1;
}, new WindowFunction<Student, String, Integer, TimeWindow>() {
/**
*
* @param key class id
* @param window 窗口
* @param input 上一个窗口聚合的结果
* @param out 收集器
* @throws Exception
*/
@Override
public void apply(Integer key, TimeWindow window, Iterable<Student> input, Collector<String> out) throws Exception {
Student next = input.iterator().next();
String msg=String.format("key=%s,window[%s,%s),input=%s",key, window.getStart(),window.getEnd(),next.toString());
out.collect(msg);
}
}).print();
env.execute();
}
输出
11> key=1,window[1628997085000,1628997090000),input=Student(classId=1, stuId=1, score=150.0)
这样不仅可以获取聚合信息,也可以获取窗口信息。
AggregateFunction(增量聚合函数)
* @param <IN> 输入元素类型
* @param <ACC> 累加器类型
* @param <OUT> 输出类型
*/
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
统计学生平均分
@Test
public void test3() throws Exception {
Configuration config=new Configuration();
config.setInteger("rest.port",8081); // 配置固定端口
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
DataStreamSource<String> source = env.socketTextStream("mydocker", 9999);
// 转换成对象
SingleOutputStreamOperator<Student> map = source.map((MapFunction<String, Student>) value -> new Student(value.split(",")));
//设置窗口
KeyedStream<Student, Integer> keyBy = map.keyBy(Student::getClassId);
//使用 reduce进行聚合
keyBy.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new org.apache.flink.api.common.functions.AggregateFunction<Student, Tuple2<Integer,Double>, Double>() {
/**
* 初始化累加器,只会调用一次。
* 这里计算学生个数,所以从零开始。
* @return
*/
@Override
public Tuple2<Integer,Double> createAccumulator() {
// 初始化 学生个数,成绩
return Tuple2.of(0,0.0);
}
/**
* 元素个数累加
* (来一个学生就累计次数+1)
* @param value
* @param accumulator
* @return
*/
@Override
public Tuple2<Integer,Double> add(Student value, Tuple2<Integer,Double> accumulator) {
// 学生个数+1
return Tuple2.of(accumulator.f0+1,value.getScore()+accumulator.f1);
}
/**
* 结果返回,计算平均分,所以是double类型
* @param accumulator
* @return
*/
@Override
public Double getResult(Tuple2<Integer,Double> accumulator) {
return accumulator.f1/accumulator.f0;
}
/**
* 两个累加器合并,
* 当窗口是会话窗口才会生效
* @param a
* @param b
* @return
*/
@Override
public Tuple2<Integer,Double> merge(Tuple2<Integer,Double> a, Tuple2<Integer,Double> b) {
return null;
}
}).print();
env.execute();
}
- 输入
1,1,90
1,2,60
- 输出
11> 75.0
- 获取窗口信息
同样也有其他的重载方法,可以用于获取窗口信息。
@PublicEvolving
public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(
AggregateFunction<T, ACC, V> aggFunction,
WindowFunction<V, R, K, W> windowFunction
) {
ProcessWindowFunction(全窗口函数)
会将窗口中所有的元素都攒下来,等到窗口关闭的时候,一起来进行处理。这样就会造成内存占用过高,但是有些时候又不得不用,比如全局排序
.process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {
// 参数1: key 参数2: 上下文对象 参数3: 这个窗口内所有的元素 参数4: 收集器, 用于向下游传递数据
@Override
public void process(String key,
Context context,
Iterable<Tuple2<String, Long>> elements,
Collector<Tuple2<String, Long>> out) throws Exception {
System.out.println(context.window().getStart());
long sum = 0L;
for (Tuple2<String, Long> t : elements) {
sum += t.f1;
}
out.collect(Tuple2.of(key, sum));
}
})
Iterable<Tuple2<String, Long>> elements,
存储着所有的元素。
网友评论