美文网首页spark||flink||scala
[Flink原理]-一文入门Flink中对状态的管理

[Flink原理]-一文入门Flink中对状态的管理

作者: 延眠万里 | 来源:发表于2020-04-06 08:41 被阅读0次

    概述:
    状态作为流计算的核心属性,Flink针对状态做了很多的处理,即你可以将中间的计算结果进行保存,并提供给后续的计算使用。

    分类:

    • KeyState:
      • ValueState
      • ListState
      • ReducingState
      • AggregatingState
      • FoldingState
      • MapState
    • OperatorState
      • ListState
      • UnionListState
      • BroadcastState

    原始状态和托管状态:

    Keyed State和Operator State,可以以两种形式存在:原始状态和托管状态。

    • raw state: 原始状态
    • managed state Flink内部托管

    托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等。

    而raw state即原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。

    通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。

    状态后端:

    • MemoryStateBackend
    • FsStateBackend
    • RocksDBStateBackend

    代码示例:

    KeyedStateCustom:

    package state;
    
    import com.google.common.collect.Lists;
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.api.common.state.StateTtlConfig;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 自定义KeyedState
     *
     * @author lixiyan
     * @date 2020/4/5 8:34 PM
     */
    public class KeyedStateCustom extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {
    
        private ListState<Long> abnormalData;
    
    
        // 需要监控的阈值
        private Long threshold;
        // 触发报警的次数
        private Integer numberOfTimes;
    
        KeyedStateCustom(Long threshold, Integer numberOfTimes) {
            this.threshold = threshold;
            this.numberOfTimes = numberOfTimes;
        }
    
    
        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            StateTtlConfig ttl = StateTtlConfig
                    // 设置有效期为 1 秒
                    .newBuilder(Time.seconds(1))
                    // 设置有效期更新规则,这里设置为当创建和写入时,都重置其有效期到规定的10秒
                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                    /*设置只要值过期就不可见,另外一个可选值是ReturnExpiredIfNotCleanedUp,
                            代表即使值过期了,但如果还没有被物理删除,就是可见的*/
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    .build();
            ListStateDescriptor<Long> abnormalDataDesc = new ListStateDescriptor<>("abnormalData", Long.class);
            abnormalDataDesc.enableTimeToLive(ttl);
            this.abnormalData = getRuntimeContext().getListState(abnormalDataDesc);
        }
    
        @Override
        public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out) throws Exception {
            Long inputValue = value.f1;
            // 如果超过阀值,记录不正常信息
            if (inputValue >= threshold) {
                abnormalData.add(inputValue);
            }
    
            ArrayList<Long> lists = Lists.newArrayList(abnormalData.get().iterator());
            // 如果不正常的数据出现一定次数,则输出报警信息
            if (lists.size() >= numberOfTimes) {
                out.collect(Tuple2.of(value.f0 + "超过指定阈值", lists));
                // 输出后清空状态
                abnormalData.clear();
            }
        }
    }
    
    

    keyedState主要继承了RichFlatMapFunction,从而能从RuntimeContext中获取和更state改状态,这里还在open中加入了ttl过期时间的设置,在实际的生产中,很容易因为状态不清理,或者状态延时堆积导致系统崩溃,所以就需要我们通过ttl配置,设置最大过期时间,当到达最大过期时间还没有处理,状态就会被清空。

    KeyedState测试类:

    package state;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * 测试类
     *
     * @author lixiyan
     * @date 2020/4/5 9:01 PM
     */
    public class Main {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(
                    Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
                    Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
                    Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
                    Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 40L));
    
            tuple2DataStreamSource.keyBy(0).flatMap(new KeyedStateCustom( 100L,3)).uid("keyed").print();
    
            env.execute("keyed State");
        }
    }
    

    结果演示:

    结果演示

    从输出中我们看到记录了3个状态的输出,达到我们预期效果。

    OperatorStateCustom

    package state;
    
    import org.apache.flink.api.common.functions.RichFlatMapFunction;
    import org.apache.flink.api.common.state.ListState;
    import org.apache.flink.api.common.state.ListStateDescriptor;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.runtime.state.FunctionInitializationContext;
    import org.apache.flink.runtime.state.FunctionSnapshotContext;
    import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * 自定义operator
     *
     * @author lixiyan
     * @date 2020/4/5 9:11 PM
     */
    public class OperatorStateCustom extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Tuple2<String, Long>>>> implements CheckpointedFunction {
    
        // 非正常数据
        private List<Tuple2<String, Long>> bufferedData;
        // checkPointedState
        private transient ListState<Tuple2<String, Long>> checkPointedState;
        // 需要监控的阈值
        private Long threshold;
        // 次数
        private Integer numberOfTimes;
    
    
        OperatorStateCustom(Long threshold, Integer numberOfTimes) {
            this.threshold = threshold;
            this.numberOfTimes = numberOfTimes;
            this.bufferedData = new ArrayList<>();
        }
    
    
        @Override
        public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Tuple2<String, Long>>>> out) throws Exception {
            Long inputVal = value.f1;
            // 超过阀值记录
            if (inputVal > threshold){
                bufferedData.add(value);
            }
            // 超过指定次数则输出报警信息
            if (bufferedData.size()>= numberOfTimes){
                // 输出状态实例的hashcode
                out.collect(Tuple2.of(checkPointedState.hashCode()+" 超过阀值",bufferedData));
                bufferedData.clear();
    
            }
    
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // 在进行快照时,将数据存储到checkPointedState
            checkPointedState.clear();
            System.out.println("snapshotState");
            for (Tuple2<String, Long> element : bufferedData) {
                System.out.println(element);
                checkPointedState.add(element);
            }
        }
    
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            // 注意这里获取的是OperatorStateStore
            checkPointedState = context.getOperatorStateStore()
                    .getListState(new ListStateDescriptor<>("abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
            })));
    
            System.out.println("initializeState:"+checkPointedState.get());
            // 如果发生重启,则需要从快照中将状态进行恢复
            if (context.isRestored()){
                System.out.println("aaaaaa");
                for (Tuple2<String, Long> element : checkPointedState.get()) {
                    bufferedData.add(element);
                }
            }
    
        }
    }
    
    

    operatorState在实现的不同之处在于,还需要实现CheckpointedFunction接口,接口中我们主要实现两个方法initializeState方法,负责初始化状态,在这里我们可以获取我们的状态,snapshotState方法主要用于在状态即将写入checkpoint时,将数据存入状态中,从而写入checkpoint数据,当配置不开启checkpoint不会执行此方法。

    operator state测试类:

    package state;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.CheckpointConfig;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    /**
     * 测试operator state
     *
     * @author lixiyan
     * @date 2020/4/5 10:35 PM
     */
    public class Main2 {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.enableCheckpointing(1);
            //设置statebackend
            env.setStateBackend(new FsStateBackend("file:///Users/lionli/lixiyan/flink/flink_practice/flink/src/main/resources/"));
            CheckpointConfig config = env.getCheckpointConfig();
            // 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
            config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            // 设置checkpoint的周期, 每隔100 ms进行启动一个检查点
            config.setCheckpointInterval(100);
            // 设置模式为exactly-once
            config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
            // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
            config.setCheckpointTimeout(60000);
            // 同一时间只允许进行一个检查点
            config.setMaxConcurrentCheckpoints(1);
            // 执行逻辑
            DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.setParallelism(1).fromElements(
                    Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
                    Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
                    Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
                    Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));
    
            tuple2DataStreamSource.flatMap(new OperatorStateCustom(100L, 3)).printToErr();
            env.execute("Operator State");
        }
    }
    
    
    测试结果

    可以看到输出了对应的数据

    区别:

    通过上面的结果我们可以发现keyed state和operator state的区别,operator state更像多个keyed state的集合,因为一个operator上可能会有多个key。

    keyed state

    keyed状态

    Operator State

    operator 状态

    总结:

    本篇文章主要介绍了在Flink中的状态管理,分为keyed state和operator state,并且针对这两种状态进行了实例演示,在实际的开发中可以根据自己实际情况进行合适的选择。

    本文由博客群发一文多发等运营工具平台 OpenWrite 发布

    相关文章

      网友评论

        本文标题:[Flink原理]-一文入门Flink中对状态的管理

        本文链接:https://www.haomeiwen.com/subject/uxhvphtx.html