AggregatingState
在进行多维指标计算时,灵活运用AggregatingState,可以有效隔离代码,降低各指标逻辑的耦合度,避免密集恐惧症,让代码更加美观
定义
// 第一个Double表示输入数据类型 第二个Double表示输出数据类型
private AggregatingState<Double, Double> firVState;
open初始化
// 第一个Double表示输入数据类型 第二个Double表示累加器数据类型 第三个Double表示输出数据类型
// AggregatingStateDescriptor中的Double表示累加器数据类型
// Tuple2<Integer, Double>写法1: Types.TUPLE(Types.INT, Types.DOUBLE)
// Tuple2<Integer, Double>写法2: TypeInformation.of(new TypeHint<Map<Double, Integer>>() {})
// Double写法1: Types.DOUBLE
// Double写法2: Double.class
firVState=getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<>("first-value",new FirVAggFunction(),Double.class));
FirVAggFunction
假设输入数据不存在小于0的数,但存在null值数据
public class FirVAggFunction implements AggregateFunction<Double, Double, Double> {
@Override
public Double createAccumulator() {
return -1.0;
}
@Override
public Double add(Double value, Double accumulator) {
if (value < 0) return accumulator;
return accumulator < 0 ? value : (accumulator + value);
}
@Override
public Double getResult(Double accumulator) {
return accumulator < 0 ? null : accumulator;
}
/**
* 仅在使用session window时才需要使用
* @param a
* @param b
* @return
*/
@Override
public Double merge(Double a, Double b) {
return null;
}
}
注意点
1.在使用Tuple作为累加器时,内部的值不可以存在null,否则在做快照时会报错,因为Tuple的null值无法序列化
2.使用AggregatingState需要对null做过滤,否则get()的结果为null
完整代码
import lombok.*;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import java.util.ArrayList;
import java.util.Optional;
public class TestAggState {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
ArrayList<String> objects = new ArrayList<>();
objects.add("key1 10");
objects.add("key1 20");
objects.add("key1 30");
objects.add("key1 --");
objects.add("key1 --");
objects.add("key1 --");
objects.add("key1 --");
objects.add("key1 --");
objects.add("key1 --");
objects.add("key1 --");
objects.add("key1 --");
objects.add("key1 --");
objects.add("key1 --");
objects.add("key1 --");
objects.add("key1 --");
env.fromCollection(objects)
.map(new MapFunction<String, Row>() {
@Override
public Row map(String value) throws Exception {
String[] els = value.split(" ");
Double v = null;
try {
v = Double.parseDouble(els[1]);
} catch (Exception ignored) {
}
return new Row(els[0], v);
}
})
.keyBy(Row::getId)
.process(new KeyedProcessFunction<String, Row, Double>() {
private AggregatingState<Double, Double> firVSuccessState;
private AggregatingState<Double, Double> firVErrState;
@Override
public void open(Configuration parameters) throws Exception {
firVSuccessState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<>("first-ok-value", new FirVAggFunction(), Types.DOUBLE));
firVErrState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<>("first-err-value", new FirVAggFunction(), Types.DOUBLE));
}
@Override
public void processElement(Row value, KeyedProcessFunction<String, Row, Double>.Context ctx, Collector<Double> out) throws Exception {
Double oriData = value.getV();
Double checkedData = Optional.ofNullable(oriData).orElse(-1D);
out.collect(checkedData);
System.out.println("value=" + value);
// 若此处不对空值做处理,会导致firVState.get()的结果为null
firVSuccessState.add(checkedData);
firVErrState.add(oriData);
System.out.println("firVSuccessState=" + firVSuccessState.get());
System.out.println("firVErrState=" + firVErrState.get());
}
})
.print("rs");
env.execute();
}
public static class FirVAggFunction implements AggregateFunction<Double, Double, Double> {
@Override
public Double createAccumulator() {
return -1D;
}
@Override
public Double add(Double value, Double accumulator) {
if (value < 0) return accumulator;
return accumulator < 0 ? value : (accumulator + value);
}
@Override
public Double getResult(Double accumulator) {
return accumulator < 0 ? null : accumulator;
}
@Override
public Double merge(Double a, Double b) {
return null;
}
}
@Setter
@Getter
@ToString
@NoArgsConstructor
@AllArgsConstructor
private static class Row {
private String id;
private Double v;
}
}
结果
value=TestAggState.Row(id=key1, v=30.0)
firVSuccessState=30.0
firVErrState=30.0
value=TestAggState.Row(id=key1, v=null)
firVSuccessState=30.0
firVErrState=null
value=TestAggState.Row(id=key1, v=null)
firVSuccessState=30.0
firVErrState=null
value=TestAggState.Row(id=key1, v=null)
firVSuccessState=30.0
firVErrState=null
value=TestAggState.Row(id=key1, v=null)
firVSuccessState=30.0
firVErrState=null
value=TestAggState.Row(id=key1, v=null)
firVSuccessState=30.0
firVErrState=null
value=TestAggState.Row(id=key1, v=null)
firVSuccessState=30.0
firVErrState=null
value=TestAggState.Row(id=key1, v=null)
firVSuccessState=30.0
firVErrState=null
value=TestAggState.Row(id=key1, v=null)
firVSuccessState=30.0
firVErrState=null
value=TestAggState.Row(id=key1, v=20.0)
firVSuccessState=50.0
firVErrState=20.0
value=TestAggState.Row(id=key1, v=null)
firVSuccessState=50.0
firVErrState=null
value=TestAggState.Row(id=key1, v=null)
firVSuccessState=50.0
firVErrState=null
value=TestAggState.Row(id=key1, v=null)
firVSuccessState=50.0
firVErrState=null
value=TestAggState.Row(id=key1, v=10.0)
firVSuccessState=60.0
firVErrState=10.0
value=TestAggState.Row(id=key1, v=null)
firVSuccessState=60.0
firVErrState=null
网友评论