美文网首页
Flink利用State进行统计

Flink利用State进行统计

作者: Jorvi | 来源:发表于2020-01-09 11:58 被阅读0次

    一. 托管State

    1.1 主程序

    public class StateWordCount {
        
        public static void main(String[] args) throws Exception {
    
            final ParameterTool parameters = ParameterTool.fromArgs(args);
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setGlobalJobParameters(parameters);
    
            // Checkpoint
            env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
    
            // StateBackend
            StateBackend stateBackend = new FsStateBackend(new Path("file:///E:\\data\\flink"));
            env.setStateBackend(stateBackend);
    
            env
                .addSource(new SourceFromFile())
                .setParallelism(1)
                .name("demo-source")
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] arr = value.split(",");
                        for (String item : arr) {
                            out.collect(new Tuple2<>(item, 1));
                        }
                    }
                })
                .name("demo-flatMap")
                .keyBy(0)
                .flatMap(new WordCountFlatMap())
                .print();
    
            env.execute("StateWordCount");
        }
    
    }
    
    1. 构建 StreamExecutionEnvironment;
    2. 启动 Checkpoint,并设置间隔时间;
    3. 设置 StateBackend 为 FsStateBackend;
    4. 从文件读取数据开始统计。

    1.2 Source

    public class SourceFromFile extends RichSourceFunction<String> {
        private volatile Boolean isRunning = true;
    
        @Override
        public void run(SourceContext ctx) throws Exception {
            BufferedReader bufferedReader = new BufferedReader(new FileReader("E:\\documents\\test.txt"));
            while (isRunning) {
                String line = bufferedReader.readLine();
                if (StringUtils.isBlank(line)) {
                    continue;
                }
                ctx.collect(line);
                TimeUnit.SECONDS.sleep(60);
            }
        }
    
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
    

    1.3 Process

    public class WordCountFlatMap extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {
    
        private ValueState<Tuple2<String, Integer>> valueState;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
    
            // 配置 StateTTL(TimeToLive)
            StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(3))   // 存活时间
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)  // 永远不返回过期的用户数据
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)  // 每次写操作创建和更新时,修改上次访问时间戳
                .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime) // 目前只支持 ProcessingTime
                .build();
    
            // 创建 ValueStateDescriptor
            ValueStateDescriptor descriptor = new ValueStateDescriptor("wordCountStateDesc",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
            
            // 激活 StateTTL
            descriptor.enableTimeToLive(ttlConfig);
    
            // 基于 ValueStateDescriptor 创建 ValueState
            valueState = getRuntimeContext().getState(descriptor);
    
        }
    
        @Override
        public void flatMap(Tuple2<String, Integer> input, Collector<Tuple2<String, Integer>> collector) throws Exception {
            Tuple2<String, Integer> currentState = valueState.value();
            
            // 初始化 ValueState 值
            if (null == currentState) {
                currentState = new Tuple2<>(input.f0, 0);
            }
    
            Tuple2<String, Integer> newState = new Tuple2<>(currentState.f0, currentState.f1 + input.f1);
    
            // 更新 ValueState 值
            valueState.update(newState);
    
            collector.collect(newState);
        }
    }
    

    继承 RichFlatMapFunction

    1. 初始化阶段执行 open 方法:
      1.1 配置 StateTTL(TimeToLive),具体见注释
      1.2 创建 ValueStateDescriptor,基于 ValueStateDescriptor 创建 ValueState

    2. 处理数据执行 flatMap 方法:
      2.1 初始化 ValueState
      2.2 更新 ValueState
      2.3 返回结果

    注:KeyedStream 中,每个 Key 对应一个 State

    1.4 测试

    输入:

    aaa,bbb
    aaa
    aaa
    aaa
    aaa
    aaa
    aaa,bbb
    aaa,bbb
    aaa,bbb
    

    输出:

    3> (aaa,1)
    2> (bbb,1)
    3> (aaa,2)
    3> (aaa,3)
    3> (aaa,4)
    3> (aaa,5)
    3> (aaa,6)
    3> (aaa,7)
    2> (bbb,1)
    2> (bbb,2)
    3> (aaa,8)
    3> (aaa,9)
    2> (bbb,3)
    

    二. 自主操作State

    2.1 Scala写法

    2.1.1 主程序

    object StateWordCount {
    
      def main(args: Array[String]): Unit = {
        val parameters: ParameterTool = ParameterTool.fromArgs(args)
        val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
        streamEnv.getConfig.setGlobalJobParameters(parameters)
    
        // Checkpoint
        streamEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)
    
        // StateBackend
        val stateBackend: StateBackend = new FsStateBackend(new Path("file:///E:\\data\\flink"))
        streamEnv.setStateBackend(stateBackend)
    
        streamEnv
          .addSource(new SourceFromFile)
          .setParallelism(1)
          .flatMap(new WordCountFlatMapFunction)
          .keyBy(_._1)
          .process(new WordCountProcessFunction())
          .print()
    
        streamEnv.execute()
      }
    }
    

    2.1.2 Source

    与上面一样,不赘述。

    2.1.3 Process

    class WordCountFlatMapFunction extends RichFlatMapFunction[String, (String, Integer)] {
      override def flatMap(value: String, out: Collector[(String, Integer)]): Unit = {
        val arr: Array[String] = value.split(",")
        for (item <- arr) {
          out.collect(Tuple2.apply(item, 1))
        }
      }
    }
    

    继承 RichFlatMapFunction
    功能与 FlatMap 一样。

    class WordCountProcessFunction extends KeyedProcessFunction[String, (String, Integer), (String, Integer)] {
    
      private var valueState: ValueState[(String, Integer)] = _
      private var timerState: ValueState[Long] = _
    
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
    
        var valueStateDesc: ValueStateDescriptor[(String, Integer)] = new ValueStateDescriptor[(String, Integer)]("valueStateDesc",
          TypeInformation.of(classOf[(String, Integer)]))
        valueState = getRuntimeContext.getState(valueStateDesc)
    
        var timerStateDesc: ValueStateDescriptor[Long] = new ValueStateDescriptor[Long]("timerStateDesc",
          TypeInformation.of(classOf[Long]))
        timerState = getRuntimeContext.getState(timerStateDesc)
      }
    
      override def processElement(value: (String, Integer),
                                  ctx: KeyedProcessFunction[String, (String, Integer), (String, Integer)]#Context,
                                  out: Collector[(String, Integer)]): Unit = {
        var currentState: (String, Integer) = valueState.value()
    
        if (null == currentState) {
          currentState = (value._1, 0)
    
          // 只在 ValueState 里没有值的时候才更新定时器触发时间
          val ttlTime: Long = System.currentTimeMillis() + 3 * 60 * 1000;
          ctx.timerService().registerProcessingTimeTimer(ttlTime)
    
          // 保存定时器触发时间到状态中
          timerState.update(ttlTime)
        }
    
        var newState: (String, Integer) = (currentState._1, currentState._2 + value._2)
        valueState.update(newState)
    
        out.collect(newState)
      }
    
      override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, Integer), (String, Integer)]#OnTimerContext, out: Collector[(String, Integer)]): Unit = {
        super.onTimer(timestamp, ctx, out)
    
        System.out.println("clear...")
    
        valueState.clear()
    
        // 清除定时器
        val ttlTime = timerState.value()
        ctx.timerService().deleteProcessingTimeTimer(ttlTime)
      }
    }
    

    继承 KeyedProcessFunction

    1. 初始化阶段执行 open 方法:
      1.1 创建 ValueStateDescriptor
      1.2 基于 ValueStateDescriptor 初始化 valueState

    2. 处理数据执行 processElement 方法:
      2.1 在 valueState 为空时,初始化 currentState,并注册定时器 Timer 触发时间,同时记录定时器 Timer 触发时间到状态 timerState 中
      2.2 计算出新的 State 值并更新到 valueState
      2.3 返回结果

    3. 定时器 Timer 触发,执行 onTimer 方法:
      3.1 清除 valueState 里的值
      3.2 从状态 timerState 中取出保存的时间值,清除该时间值对应的定时器 Timer

    2.1.4 测试

    输入:

    aaa,bbb
    aaa
    aaa
    aaa
    aaa
    aaa
    aaa,bbb
    aaa,bbb
    aaa,bbb
    

    输出:

    3> (aaa,1)
    2> (bbb,1)
    3> (aaa,2)
    3> (aaa,3)
    3> (aaa,4)
    clear...
    clear...
    3> (aaa,1)
    3> (aaa,2)
    3> (aaa,3)
    2> (bbb,1)
    3> (aaa,4)
    2> (bbb,2)
    clear...
    2> (bbb,3)
    3> (aaa,1)
    clear...
    clear...
    

    可以看出:
    对于 KeyState,每个Key都对应一个ValueState。
    根据代码中的逻辑:当各ValueState为空被初始化时,会更新各自对应的定时器触发时间,当各自的触发器被触发的时候执行 onTimer 方法,清除对应的 ValueState 里的值。

    即:
    一个key --> 一个ValueState(MapState类似) --> 一个TTL Timer
    当某个 TTL Timer 被触发而调用 onTimer 方法时,在 onTimer 方法内利用 ctx.getCurrentKey() 可取到触发该函数的那个 key,此时如果调用valueState.clear()则会清除该key对应的ValueState。

    注意:
    这边的逻辑和上面的示例有所区别:

    • 上面示例中配置StateTtlConfig.UpdateTypeOnCreateAndWrite,则每次更新 ValueState 时都会更新 TTL Time;
    • 此处示例中只在 ValueState 为空时才更新 TTL Time,并不是每次更新 ValueState 时都更新 TTL Time。

    2.2 Java写法

    2.2.1 主程序

        public static void main(String[] args) throws Exception {
    
            final ParameterTool parameters = ParameterTool.fromArgs(args);
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.getConfig().setGlobalJobParameters(parameters);
    
            // Checkpoint
            env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
    
            // StateBackend
            StateBackend stateBackend = new FsStateBackend(new Path("file:///E:\\data\\flink"));
            env.setStateBackend(stateBackend);
    
            env
                .addSource(new SourceFromFile())
                .setParallelism(1)
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] arr = value.split(",");
                        for (String item : arr) {
                            out.collect(new Tuple2<>(item, 1));
                        }
                    }
                })
                .keyBy(0)
                .process(new WordCountProcess())
                .print();
    
            env.execute("StateWordCount");
        }
    

    2.2.2 Source

    与上面一样,不赘述。

    2.2.3 Process

    public class WordCountProcess extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>> {
    
        private ValueState<Tuple2<String, Integer>> valueState;
        private ValueState<Long> timerState;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
    
            ValueStateDescriptor<Tuple2<String, Integer>> valueStateDescriptor = new ValueStateDescriptor<>("valueStateDesc",
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
                }));
            valueState = getRuntimeContext().getState(valueStateDescriptor);
    
            ValueStateDescriptor<Long> timerStateDescriptor = new ValueStateDescriptor<>("timerStateDesc",
                TypeInformation.of(new TypeHint<Long>() {
                }));
            timerState = getRuntimeContext().getState(timerStateDescriptor);
        }
    
    
        @Override
        public void processElement(Tuple2<String, Integer> input, Context ctx, Collector<Tuple2<String, Integer>> collector) throws Exception {
            Tuple2<String, Integer> currentState = valueState.value();
    
            // 初始化ValueState值
            if (null == currentState) {
                currentState = new Tuple2<>(input.f0, 0);
                Long ttlTime = System.currentTimeMillis() + 3 * 60 * 1000;
                ctx.timerService().registerProcessingTimeTimer(ttlTime);
    
                // 保存定时器触发时间到状态中
                timerState.update(ttlTime)
            }
    
            Tuple2<String, Integer> newState = new Tuple2<>(currentState.f0, currentState.f1 + input.f1);
    
            // 更新ValueState值
            valueState.update(newState);
    
            collector.collect(newState);
        }
    
    
        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
    
            System.out.println("clear...");
    
            valueState.clear();
    
            // 清除定时器
            val ttlTime = timerState.value()
            ctx.timerService().deleteProcessingTimeTimer(ttlTime);
        }
    }
    

    2.2.4 测试

    输入:

    aaa,bbb
    aaa
    aaa
    aaa
    aaa
    aaa
    aaa,bbb
    aaa,bbb
    aaa,bbb
    

    输出:

    3> (aaa,1)
    2> (bbb,1)
    3> (aaa,2)
    3> (aaa,3)
    3> (aaa,4)
    clear...
    clear...
    3> (aaa,1)
    3> (aaa,2)
    3> (aaa,3)
    2> (bbb,1)
    2> (bbb,2)
    3> (aaa,4)
    clear...
    2> (bbb,3)
    3> (aaa,1)
    clear...
    clear...
    

    逻辑与上面 Scala 写法一样,测试结果也保持一致。

    需要注意一点:keyBy方法的区别:

    • keyBy(int... fields),返回的是 KeyedStream<T, Tuple>,其 key 固定为 Tuple 类型,(此例为 KeyedStream<Tuple2<String, Integer>, Tuple>),对应 Process 中继承泛型为:KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>

    • keyBy(KeySelector<T, K> key),返回的是 KeyedStream<T, K>,其 key 自定义为 K 类型(假设为String类型),则对应 Process 中继承泛型为:KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>

    相关文章

      网友评论

          本文标题:Flink利用State进行统计

          本文链接:https://www.haomeiwen.com/subject/oigractx.html