美文网首页
Flink Checkpoint配置

Flink Checkpoint配置

作者: OzanShareing | 来源:发表于2019-12-19 15:15 被阅读0次

    范例:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // start a checkpoint every 1000 ms
    env.enableCheckpointing(1000);
    
    // advanced options:
    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    
    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    
    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    
    // enable externalized checkpoints which are retained after job cancellation
    env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
    // This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
    env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
    
    • 使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint
      (具体可以使用StreamExecutionEnvironment.getCheckpointConfig.enableCheckpointing(long interval)),
      或者StreamExecutionEnvironment.getCheckpointConfig.enableCheckpointing(long interval, CheckpointingMode mode)
      interval用于指定checkpoint的触发间隔(单位milliseconds)
      CheckpointingMode默认是CheckpointingMode.EXACTLY_ONCE
      也可以指定为CheckpointingMode.AT_LEAST_ONCE

    • 也可以通过StreamExecutionEnvironment.getCheckpointConfig().setCheckpointingMode来设置CheckpointingMode
      一般对于超低延迟的应用(大概几毫秒)可以使用CheckpointingMode.AT_LEAST_ONCE,其他大部分应用使用CheckpointingMode.EXACTLY_ONCE就可以
      checkpointTimeout用于指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉。

    • minPauseBetweenCheckpoints用于指定checkpoint距上一个checkpoint完成之后最少等多久可以出发另一个checkpoint
      当指定这个参数时,maxConcurrentCheckpoints的值为1

    • maxConcurrentCheckpoints用于指定运行中的checkpoint最多可以有多少个;
      如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)

    • enableExternalizedCheckpoints用于开启checkpoints的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理stateExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理,DELETE_ON_CANCELLATION的话,在job canceled的时候会自动删除externalized state,但是如果是FAILED的状态则会保留;RETAIN_ON_CANCELLATION则在job canceled的时候会保留externalized checkpoint state

    • failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该failtask,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行

    flink-conf.yaml相关配置:

    #==============================================================================
    # Fault tolerance and checkpointing
    #==============================================================================
    
    # The backend that will be used to store operator state checkpoints if
    # checkpointing is enabled.
    #
    # Supported backends are 'jobmanager', 'filesystem', 'rocksdb', or the
    # <class-name-of-factory>.
    #
    # state.backend: filesystem
    
    # Directory for checkpoints filesystem, when using any of the default bundled
    # state backends.
    #
    # state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
    
    # Default target directory for savepoints, optional.
    #
    # state.savepoints.dir: hdfs://namenode-host:port/flink-checkpoints
    
    # Flag to enable/disable incremental checkpoints for backends that
    # support incremental checkpoints (like the RocksDB state backend). 
    #
    # state.backend.incremental: false
    
    • state.backend用于指定checkpoint state存储的backend,默认为none
    • state.backend.async用于指定backend是否使用异步snapshot(默认为true),有些不支持async或者只支持asyncstate backend可能会忽略这个参数
    • state.backend.fs.memory-threshold,默认为1024,用于指定存储于filesstate大小阈[yù]值,如果小于该值则会存储在root checkpoint metadata file
    • state.backend.incremental,默认为false,用于指定是否采用增量checkpoint,有些不支持增量checkpointbackend会忽略该配置
    • state.backend.local-recovery,默认为false
    • state.checkpoints.dir,默认为none,用于指定checkpointdata filesmeta data存储的目录,该目录必须对所有参与的TaskManagersJobManagers可见
    • state.checkpoints.num-retained,默认为1,用于指定保留的已完成的checkpoints个数
    • state.savepoints.dir,默认为none,用于指定savepoints的默认目录
    • taskmanager.state.local.root-dirs,默认为none

    小结:

    • 可以通过使用StreamExecutionEnvironment.enableCheckpointing方法来设置开启checkpoint;具体可以使用enableCheckpointing(long interval),或者enableCheckpointing(long interval, CheckpointingMode mode)

    • checkpoint的高级配置可以配置enableExternalizedCheckpoints(用于开启checkpoints的外部持久化,在job failed的时候externalized checkpoint state无法自动清理,但是在job canceled的时候可以配置是删除还是保留state)

    • flink-conf.yaml里头也有checkpoint的相关配置,主要是state backend的配置,比如state.backend.asyncstate.backend.incrementalstate.checkpoints.dirstate.savepoints.dir

    Java 配置实例:

    /**
     * 是否重启标识flag
     */
    private static boolean replayFlag = true;
    
    /**
     * 重启次数
     */
    private static Integer replayTimes;
    
    /**
     * 重启时间间隔
     */
    private static Integer replaySeconds;
    
    private static Long checkPointTime;
    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    if (replayFlag) {
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                replayTimes,
                Time.of(replaySeconds, TimeUnit.SECONDS)
        ));
        CheckpointConfig config = env.getCheckpointConfig();
    
        //env.setStateBackend(new FsStateBackend(checkPointDir));
        // 任务流取消和故障时会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
        config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        
        // 设置checkpoint的周期, 每隔1000 ms进行启动一个检查点
        config.setCheckpointInterval(checkPointTime);
        
        // 设置模式为exactly-once
        config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        
        // 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】
        config.setMinPauseBetweenCheckpoints(500);
        
        // 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】
        config.setCheckpointTimeout(checkPointTime);
        
        // 同一时间只允许进行一个检查点
        config.setMaxConcurrentCheckpoints(1);
    }
    

    相关文章

      网友评论

          本文标题:Flink Checkpoint配置

          本文链接:https://www.haomeiwen.com/subject/mkncgctx.html