美文网首页
Flink利用State进行统计

Flink利用State进行统计

作者: Jorvi | 来源:发表于2020-01-09 11:58 被阅读0次

一. 托管State

1.1 主程序

public class StateWordCount {
    
    public static void main(String[] args) throws Exception {

        final ParameterTool parameters = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(parameters);

        // Checkpoint
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

        // StateBackend
        StateBackend stateBackend = new FsStateBackend(new Path("file:///E:\\data\\flink"));
        env.setStateBackend(stateBackend);

        env
            .addSource(new SourceFromFile())
            .setParallelism(1)
            .name("demo-source")
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] arr = value.split(",");
                    for (String item : arr) {
                        out.collect(new Tuple2<>(item, 1));
                    }
                }
            })
            .name("demo-flatMap")
            .keyBy(0)
            .flatMap(new WordCountFlatMap())
            .print();

        env.execute("StateWordCount");
    }

}
  1. 构建 StreamExecutionEnvironment;
  2. 启动 Checkpoint,并设置间隔时间;
  3. 设置 StateBackend 为 FsStateBackend;
  4. 从文件读取数据开始统计。

1.2 Source

public class SourceFromFile extends RichSourceFunction<String> {
    private volatile Boolean isRunning = true;

    @Override
    public void run(SourceContext ctx) throws Exception {
        BufferedReader bufferedReader = new BufferedReader(new FileReader("E:\\documents\\test.txt"));
        while (isRunning) {
            String line = bufferedReader.readLine();
            if (StringUtils.isBlank(line)) {
                continue;
            }
            ctx.collect(line);
            TimeUnit.SECONDS.sleep(60);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

1.3 Process

public class WordCountFlatMap extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

    private ValueState<Tuple2<String, Integer>> valueState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        // 配置 StateTTL(TimeToLive)
        StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.minutes(3))   // 存活时间
            .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)  // 永远不返回过期的用户数据
            .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)  // 每次写操作创建和更新时,修改上次访问时间戳
            .setTimeCharacteristic(StateTtlConfig.TimeCharacteristic.ProcessingTime) // 目前只支持 ProcessingTime
            .build();

        // 创建 ValueStateDescriptor
        ValueStateDescriptor descriptor = new ValueStateDescriptor("wordCountStateDesc",
            TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
        
        // 激活 StateTTL
        descriptor.enableTimeToLive(ttlConfig);

        // 基于 ValueStateDescriptor 创建 ValueState
        valueState = getRuntimeContext().getState(descriptor);

    }

    @Override
    public void flatMap(Tuple2<String, Integer> input, Collector<Tuple2<String, Integer>> collector) throws Exception {
        Tuple2<String, Integer> currentState = valueState.value();
        
        // 初始化 ValueState 值
        if (null == currentState) {
            currentState = new Tuple2<>(input.f0, 0);
        }

        Tuple2<String, Integer> newState = new Tuple2<>(currentState.f0, currentState.f1 + input.f1);

        // 更新 ValueState 值
        valueState.update(newState);

        collector.collect(newState);
    }
}

继承 RichFlatMapFunction

  1. 初始化阶段执行 open 方法:
    1.1 配置 StateTTL(TimeToLive),具体见注释
    1.2 创建 ValueStateDescriptor,基于 ValueStateDescriptor 创建 ValueState

  2. 处理数据执行 flatMap 方法:
    2.1 初始化 ValueState
    2.2 更新 ValueState
    2.3 返回结果

注:KeyedStream 中,每个 Key 对应一个 State

1.4 测试

输入:

aaa,bbb
aaa
aaa
aaa
aaa
aaa
aaa,bbb
aaa,bbb
aaa,bbb

输出:

3> (aaa,1)
2> (bbb,1)
3> (aaa,2)
3> (aaa,3)
3> (aaa,4)
3> (aaa,5)
3> (aaa,6)
3> (aaa,7)
2> (bbb,1)
2> (bbb,2)
3> (aaa,8)
3> (aaa,9)
2> (bbb,3)

二. 自主操作State

2.1 Scala写法

2.1.1 主程序

object StateWordCount {

  def main(args: Array[String]): Unit = {
    val parameters: ParameterTool = ParameterTool.fromArgs(args)
    val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
    streamEnv.getConfig.setGlobalJobParameters(parameters)

    // Checkpoint
    streamEnv.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE)

    // StateBackend
    val stateBackend: StateBackend = new FsStateBackend(new Path("file:///E:\\data\\flink"))
    streamEnv.setStateBackend(stateBackend)

    streamEnv
      .addSource(new SourceFromFile)
      .setParallelism(1)
      .flatMap(new WordCountFlatMapFunction)
      .keyBy(_._1)
      .process(new WordCountProcessFunction())
      .print()

    streamEnv.execute()
  }
}

2.1.2 Source

与上面一样,不赘述。

2.1.3 Process

class WordCountFlatMapFunction extends RichFlatMapFunction[String, (String, Integer)] {
  override def flatMap(value: String, out: Collector[(String, Integer)]): Unit = {
    val arr: Array[String] = value.split(",")
    for (item <- arr) {
      out.collect(Tuple2.apply(item, 1))
    }
  }
}

继承 RichFlatMapFunction
功能与 FlatMap 一样。

class WordCountProcessFunction extends KeyedProcessFunction[String, (String, Integer), (String, Integer)] {

  private var valueState: ValueState[(String, Integer)] = _
  private var timerState: ValueState[Long] = _

  override def open(parameters: Configuration): Unit = {
    super.open(parameters)

    var valueStateDesc: ValueStateDescriptor[(String, Integer)] = new ValueStateDescriptor[(String, Integer)]("valueStateDesc",
      TypeInformation.of(classOf[(String, Integer)]))
    valueState = getRuntimeContext.getState(valueStateDesc)

    var timerStateDesc: ValueStateDescriptor[Long] = new ValueStateDescriptor[Long]("timerStateDesc",
      TypeInformation.of(classOf[Long]))
    timerState = getRuntimeContext.getState(timerStateDesc)
  }

  override def processElement(value: (String, Integer),
                              ctx: KeyedProcessFunction[String, (String, Integer), (String, Integer)]#Context,
                              out: Collector[(String, Integer)]): Unit = {
    var currentState: (String, Integer) = valueState.value()

    if (null == currentState) {
      currentState = (value._1, 0)

      // 只在 ValueState 里没有值的时候才更新定时器触发时间
      val ttlTime: Long = System.currentTimeMillis() + 3 * 60 * 1000;
      ctx.timerService().registerProcessingTimeTimer(ttlTime)

      // 保存定时器触发时间到状态中
      timerState.update(ttlTime)
    }

    var newState: (String, Integer) = (currentState._1, currentState._2 + value._2)
    valueState.update(newState)

    out.collect(newState)
  }

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, Integer), (String, Integer)]#OnTimerContext, out: Collector[(String, Integer)]): Unit = {
    super.onTimer(timestamp, ctx, out)

    System.out.println("clear...")

    valueState.clear()

    // 清除定时器
    val ttlTime = timerState.value()
    ctx.timerService().deleteProcessingTimeTimer(ttlTime)
  }
}

继承 KeyedProcessFunction

  1. 初始化阶段执行 open 方法:
    1.1 创建 ValueStateDescriptor
    1.2 基于 ValueStateDescriptor 初始化 valueState

  2. 处理数据执行 processElement 方法:
    2.1 在 valueState 为空时,初始化 currentState,并注册定时器 Timer 触发时间,同时记录定时器 Timer 触发时间到状态 timerState 中
    2.2 计算出新的 State 值并更新到 valueState
    2.3 返回结果

  3. 定时器 Timer 触发,执行 onTimer 方法:
    3.1 清除 valueState 里的值
    3.2 从状态 timerState 中取出保存的时间值,清除该时间值对应的定时器 Timer

2.1.4 测试

输入:

aaa,bbb
aaa
aaa
aaa
aaa
aaa
aaa,bbb
aaa,bbb
aaa,bbb

输出:

3> (aaa,1)
2> (bbb,1)
3> (aaa,2)
3> (aaa,3)
3> (aaa,4)
clear...
clear...
3> (aaa,1)
3> (aaa,2)
3> (aaa,3)
2> (bbb,1)
3> (aaa,4)
2> (bbb,2)
clear...
2> (bbb,3)
3> (aaa,1)
clear...
clear...

可以看出:
对于 KeyState,每个Key都对应一个ValueState。
根据代码中的逻辑:当各ValueState为空被初始化时,会更新各自对应的定时器触发时间,当各自的触发器被触发的时候执行 onTimer 方法,清除对应的 ValueState 里的值。

即:
一个key --> 一个ValueState(MapState类似) --> 一个TTL Timer
当某个 TTL Timer 被触发而调用 onTimer 方法时,在 onTimer 方法内利用 ctx.getCurrentKey() 可取到触发该函数的那个 key,此时如果调用valueState.clear()则会清除该key对应的ValueState。

注意:
这边的逻辑和上面的示例有所区别:

  • 上面示例中配置StateTtlConfig.UpdateTypeOnCreateAndWrite,则每次更新 ValueState 时都会更新 TTL Time;
  • 此处示例中只在 ValueState 为空时才更新 TTL Time,并不是每次更新 ValueState 时都更新 TTL Time。

2.2 Java写法

2.2.1 主程序

    public static void main(String[] args) throws Exception {

        final ParameterTool parameters = ParameterTool.fromArgs(args);
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(parameters);

        // Checkpoint
        env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);

        // StateBackend
        StateBackend stateBackend = new FsStateBackend(new Path("file:///E:\\data\\flink"));
        env.setStateBackend(stateBackend);

        env
            .addSource(new SourceFromFile())
            .setParallelism(1)
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] arr = value.split(",");
                    for (String item : arr) {
                        out.collect(new Tuple2<>(item, 1));
                    }
                }
            })
            .keyBy(0)
            .process(new WordCountProcess())
            .print();

        env.execute("StateWordCount");
    }

2.2.2 Source

与上面一样,不赘述。

2.2.3 Process

public class WordCountProcess extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>> {

    private ValueState<Tuple2<String, Integer>> valueState;
    private ValueState<Long> timerState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        ValueStateDescriptor<Tuple2<String, Integer>> valueStateDescriptor = new ValueStateDescriptor<>("valueStateDesc",
            TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
            }));
        valueState = getRuntimeContext().getState(valueStateDescriptor);

        ValueStateDescriptor<Long> timerStateDescriptor = new ValueStateDescriptor<>("timerStateDesc",
            TypeInformation.of(new TypeHint<Long>() {
            }));
        timerState = getRuntimeContext().getState(timerStateDescriptor);
    }


    @Override
    public void processElement(Tuple2<String, Integer> input, Context ctx, Collector<Tuple2<String, Integer>> collector) throws Exception {
        Tuple2<String, Integer> currentState = valueState.value();

        // 初始化ValueState值
        if (null == currentState) {
            currentState = new Tuple2<>(input.f0, 0);
            Long ttlTime = System.currentTimeMillis() + 3 * 60 * 1000;
            ctx.timerService().registerProcessingTimeTimer(ttlTime);

            // 保存定时器触发时间到状态中
            timerState.update(ttlTime)
        }

        Tuple2<String, Integer> newState = new Tuple2<>(currentState.f0, currentState.f1 + input.f1);

        // 更新ValueState值
        valueState.update(newState);

        collector.collect(newState);
    }


    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, Integer>> out) throws Exception {
        super.onTimer(timestamp, ctx, out);

        System.out.println("clear...");

        valueState.clear();

        // 清除定时器
        val ttlTime = timerState.value()
        ctx.timerService().deleteProcessingTimeTimer(ttlTime);
    }
}

2.2.4 测试

输入:

aaa,bbb
aaa
aaa
aaa
aaa
aaa
aaa,bbb
aaa,bbb
aaa,bbb

输出:

3> (aaa,1)
2> (bbb,1)
3> (aaa,2)
3> (aaa,3)
3> (aaa,4)
clear...
clear...
3> (aaa,1)
3> (aaa,2)
3> (aaa,3)
2> (bbb,1)
2> (bbb,2)
3> (aaa,4)
clear...
2> (bbb,3)
3> (aaa,1)
clear...
clear...

逻辑与上面 Scala 写法一样,测试结果也保持一致。

需要注意一点:keyBy方法的区别:

  • keyBy(int... fields),返回的是 KeyedStream<T, Tuple>,其 key 固定为 Tuple 类型,(此例为 KeyedStream<Tuple2<String, Integer>, Tuple>),对应 Process 中继承泛型为:KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Integer>>

  • keyBy(KeySelector<T, K> key),返回的是 KeyedStream<T, K>,其 key 自定义为 K 类型(假设为String类型),则对应 Process 中继承泛型为:KeyedProcessFunction<String, Tuple2<String, Integer>, Tuple2<String, Integer>>

相关文章

网友评论

      本文标题:Flink利用State进行统计

      本文链接:https://www.haomeiwen.com/subject/oigractx.html