美文网首页
使用AggregatingState进行多维指标计算

使用AggregatingState进行多维指标计算

作者: apophisdeity | 来源:发表于2022-04-14 00:02 被阅读0次

    AggregatingState

    在进行多维指标计算时,灵活运用AggregatingState,可以有效隔离代码,降低各指标逻辑的耦合度,避免密集恐惧症,让代码更加美观
    定义

    // 第一个Double表示输入数据类型 第二个Double表示输出数据类型
    private AggregatingState<Double, Double> firVState;
    

    open初始化

    // 第一个Double表示输入数据类型 第二个Double表示累加器数据类型 第三个Double表示输出数据类型
    // AggregatingStateDescriptor中的Double表示累加器数据类型
    // Tuple2<Integer, Double>写法1: Types.TUPLE(Types.INT, Types.DOUBLE)
    // Tuple2<Integer, Double>写法2: TypeInformation.of(new TypeHint<Map<Double, Integer>>() {})
    // Double写法1: Types.DOUBLE
    // Double写法2: Double.class
    firVState=getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<>("first-value",new FirVAggFunction(),Double.class));
    

    FirVAggFunction
    假设输入数据不存在小于0的数,但存在null值数据

    public class FirVAggFunction implements AggregateFunction<Double, Double, Double> {
        @Override
        public Double createAccumulator() {
            return -1.0;
        }
    
        @Override
        public Double add(Double value, Double accumulator) {
            if (value < 0) return accumulator;
            return accumulator < 0 ? value : (accumulator + value);
        }
    
        @Override
        public Double getResult(Double accumulator) {
            return accumulator < 0 ? null : accumulator;
        }
    
        /**
         * 仅在使用session window时才需要使用
         * @param a
         * @param b
         * @return
         */
        @Override
        public Double merge(Double a, Double b) {
            return null;
        }
    }
    

    注意点

    1.在使用Tuple作为累加器时,内部的值不可以存在null,否则在做快照时会报错,因为Tuple的null值无法序列化
    2.使用AggregatingState需要对null做过滤,否则get()的结果为null

    完整代码

    
    import lombok.*;
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.state.AggregatingState;
    import org.apache.flink.api.common.state.AggregatingStateDescriptor;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.Optional;
    
    public class TestAggState {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    
            ArrayList<String> objects = new ArrayList<>();
            objects.add("key1 10");
            objects.add("key1 20");
            objects.add("key1 30");
            objects.add("key1 --");
            objects.add("key1 --");
            objects.add("key1 --");
            objects.add("key1 --");
            objects.add("key1 --");
            objects.add("key1 --");
            objects.add("key1 --");
            objects.add("key1 --");
            objects.add("key1 --");
            objects.add("key1 --");
            objects.add("key1 --");
            objects.add("key1 --");
    
            env.fromCollection(objects)
                    .map(new MapFunction<String, Row>() {
                        @Override
                        public Row map(String value) throws Exception {
                            String[] els = value.split(" ");
                            Double v = null;
                            try {
                                v = Double.parseDouble(els[1]);
                            } catch (Exception ignored) {
                            }
                            return new Row(els[0], v);
                        }
                    })
                    .keyBy(Row::getId)
                    .process(new KeyedProcessFunction<String, Row, Double>() {
                        private AggregatingState<Double, Double> firVSuccessState;
                        private AggregatingState<Double, Double> firVErrState;
    
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            firVSuccessState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<>("first-ok-value", new FirVAggFunction(), Types.DOUBLE));
                            firVErrState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<>("first-err-value", new FirVAggFunction(), Types.DOUBLE));
                        }
    
                        @Override
                        public void processElement(Row value, KeyedProcessFunction<String, Row, Double>.Context ctx, Collector<Double> out) throws Exception {
                            Double oriData = value.getV();
                            Double checkedData = Optional.ofNullable(oriData).orElse(-1D);
                            out.collect(checkedData);
                            System.out.println("value=" + value);
                            // 若此处不对空值做处理,会导致firVState.get()的结果为null
                            firVSuccessState.add(checkedData);
                            firVErrState.add(oriData);
                            System.out.println("firVSuccessState=" + firVSuccessState.get());
                            System.out.println("firVErrState=" + firVErrState.get());
                        }
    
                    })
                    .print("rs");
    
            env.execute();
        }
    
        public static class FirVAggFunction implements AggregateFunction<Double, Double, Double> {
    
            @Override
            public Double createAccumulator() {
                return -1D;
            }
    
            @Override
            public Double add(Double value, Double accumulator) {
                if (value < 0) return accumulator;
                return accumulator < 0 ? value : (accumulator + value);
            }
    
            @Override
            public Double getResult(Double accumulator) {
                return accumulator < 0 ? null : accumulator;
            }
    
            @Override
            public Double merge(Double a, Double b) {
                return null;
            }
        }
    
        @Setter
        @Getter
        @ToString
        @NoArgsConstructor
        @AllArgsConstructor
        private static class Row {
            private String id;
            private Double v;
        }
    }
    

    结果

    value=TestAggState.Row(id=key1, v=30.0)
    firVSuccessState=30.0
    firVErrState=30.0
    
    value=TestAggState.Row(id=key1, v=null)
    firVSuccessState=30.0
    firVErrState=null
    
    value=TestAggState.Row(id=key1, v=null)
    firVSuccessState=30.0
    firVErrState=null
    
    value=TestAggState.Row(id=key1, v=null)
    firVSuccessState=30.0
    firVErrState=null
    
    value=TestAggState.Row(id=key1, v=null)
    firVSuccessState=30.0
    firVErrState=null
    
    value=TestAggState.Row(id=key1, v=null)
    firVSuccessState=30.0
    firVErrState=null
    
    value=TestAggState.Row(id=key1, v=null)
    firVSuccessState=30.0
    firVErrState=null
    
    value=TestAggState.Row(id=key1, v=null)
    firVSuccessState=30.0
    firVErrState=null
    
    value=TestAggState.Row(id=key1, v=null)
    firVSuccessState=30.0
    firVErrState=null
    
    value=TestAggState.Row(id=key1, v=20.0)
    firVSuccessState=50.0
    firVErrState=20.0
    
    value=TestAggState.Row(id=key1, v=null)
    firVSuccessState=50.0
    firVErrState=null
    
    value=TestAggState.Row(id=key1, v=null)
    firVSuccessState=50.0
    firVErrState=null
    
    value=TestAggState.Row(id=key1, v=null)
    firVSuccessState=50.0
    firVErrState=null
    
    value=TestAggState.Row(id=key1, v=10.0)
    firVSuccessState=60.0
    firVErrState=10.0
    
    value=TestAggState.Row(id=key1, v=null)
    firVSuccessState=60.0
    firVErrState=null
    

    相关文章

      网友评论

          本文标题:使用AggregatingState进行多维指标计算

          本文链接:https://www.haomeiwen.com/subject/uwxasrtx.html