美文网首页Spark & Flink实时数据相关Flink专题
Flink DataStream 状态和容错 三:Savepoi

Flink DataStream 状态和容错 三:Savepoi

作者: Alex90 | 来源:发表于2019-01-14 14:46 被阅读11次

    Savepoint

    Savepoint 和 Checkpoint 的区别

    Savepoint 是命令触发的 Checkpoint,对流式程序做一次完整的快照并将结果写到 State backend,可用于停止、恢复或更新 Flink 程序。整个过程依赖于 Checkpoint 机制。另一个不同之处是,Savepoint 不会自动清除。

    分配 Operator IDs

    Savepoint 中会以 Operator ID 作为 key 保存每个有状态算子的状态:

    Operator ID State
    source-id State of StatefulSource
    mapper-id State of StatefulMapper

    Operator ID 用于确定每个算子的状态,只要ID不变,就可以从 Savepoint 中恢复,Operator ID 如果不显示指定会自动生成,生成的ID取决于程序的结构,并且对程序更改很敏感。因此,建议手动分配这些ID:

    DataStream<String> stream = env.
      // Stateful source (e.g. Kafka) with ID
      .addSource(new StatefulSource())
      .uid("source-id") // ID for the source operator
      .shuffle()
      // Stateful mapper with ID
      .map(new StatefulMapper())
      .uid("mapper-id") // ID for the mapper
      // Stateless printing sink
      .print(); // Auto-generated ID
    

    Savepoint 操作

    触发 Savepoint 时,会创建一个新的 Savepoint 目录,其中将存储数据和元数据。可以通过配置默认 targetDirectory 或指定自定义 targetDirectory:

    state.savepoints.dir: hdfs:///flink/savepoints
    

    如果既未配置缺省值也未指定自定义目录,Savepoint 将失败。

    触发 Savepoint

    $ bin/flink savepoint :jobId [:targetDirectory]
    

    生成 Savepoint(以 jobId 作为唯一ID),并返回创建的 Savepoint 的路径,恢复时需要使用。

    在 Yarn 集群触发 Savepoint

    $ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
    

    要指定 jobId 和 yarnAppId(YARN应用程序ID),并返回创建的 Savepoint 的路径。

    取消作业时生成 Savepoint

    $ bin/flink cancel -s [:targetDirectory] :jobId
    

    以原子方式触发具有 jobId 的 Savepoint,并取消作业。

    恢复 Savepoint

    $ bin/flink run -s :savepointPath [:runArgs]
    

    提交作业,并指定要恢复的 Savepoint路径。

    允许启动有未恢复 State

    $ bin/flink run -s :savepointPath -n [:runArgs]
    

    默认情况下,恢复操作将尝试将 Savepoint 的所有 State 恢复。如果删除了运算符,则可以通过 –allowNonRestoredState(简写为 -n) 选项跳过无法映射到新程序的状态。

    删除 Savepoint

    $ bin/flink savepoint -d :savepointPath
    

    通过指定路径删除 Savepoint,也可以通过文件系统手动删除 Savepoint 数据,而不会影响其他 Savepoint 或 Checkpoint。

    常见问题

    应该为所有算子分配ID吗?
    根据经验,是的。严格地说,只需要通过该uid()方法将ID分配给作业中的有状态 算子。Savepoint 仅包含这些算子的 State,无状态算子不是保存点的一部分。

    如果在作业中新添加一个有状态算子,会发生什么?
    新算子将在没有任何状态的情况下进行初始化,类似于无状态算子。

    如果在作业删除一个有状态的算子,会发生什么?
    如果没有指定允许启动有未恢复 State(–allowNonRestoredState / -n),启动会失败。

    如果在作业中重新排列有状态算子,会发生什么?
    如果手动这些算子分配了ID,作业将照常恢复。否则,重新排序后,有状态算子的自动生成ID很可能会更改,将导致无法从 Savepoint 恢复。

    如果在作业中添加,删除或重新排序没有状态的算子,会发生什么?
    如果为有状态算子手动分配了ID,作业将照常恢复,则无状态算子的改变不会影响。否则,重新排序后,有状态算子的自动生成ID很可能会更改,将导致无法从 Savepoint 恢复。

    如果作业的并行性发生改变,会发生什么?
    如果 Savepoint 的生成是使用 Flink 1.2.0 以及之后的版本,并且没有使用弃用状态API,可以正常恢复作业。

    如果 Savepoint 的生成比 Flink 1.2.0 更早的版本,或者使用弃用状态API,则首先必须将作业和 Savepoint 升级到1.2.0以及之后的版本,然后才能更改并行度。请参考官方 升级指南

    Restart

    Flink 支持多种不同的重启策略,控制着作业失败后如何重启。集群可以设置默认的重启策略,作业提交的时候也可以指定重启策略,覆盖默认的重启策略。

    默认的重启策略配置在 conf/flink-conf.yaml,参数 restart-strategy 定义了采用什么策略。如果 checkpoint 未启用,就会采用 "no restart" 策略,如果启用了 checkpoint 机制,但是未指定重启策略的话,就会采用 "fixed-delay" 策略。每个重启策略都有自己的参数来控制它的行为,这些值也可以在配置文件中设置,每个重启策略的描述都包含着各自的配置值信息。

    以下是支持的三种重启策略的可配置项

    重启策略 重启策略值
    Fixed delay fixed-delay
    Failure rate failure-rate
    No restart None

    除了定义一个默认的重启策略之外,你还可以为每一个Job指定它自己的重启策略,这个重启策略可以在ExecutionEnvironment中调用setRestartStrategy()方法来程序化地调用,这种方式同样适用于StreamExecutionEnvironment。

    val env = ExecutionEnvironment.getExecutionEnvironment()
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
      3, // number of restart attempts
      Time.of(10, TimeUnit.SECONDS) // delay
    ))
    

    固定延迟重启策略(Fixed Delay Restart Strategy)

    尝试一个给定的次数来重启Job,如果超过了最大的重启次数,Job最终将失败。在连续的两次重启尝试之间,重启策略会等待一个固定的时间。

    参数配置 | 描述 | 默认值
    restart-strategy.fixed-delay.attempts | Flink尝试执行的次数 | 1,如果启用checkpoint的话是Integer.MAX_VALUE
    restart-strategy.fixed-delay.delay | 两次重启之间等待的时间 | akka.ask.timeout,如果启用checkpoint的话是1s

    flink-conf.yaml参数配置:

    restart-strategy: fixed-delay
    restart-strategy.fixed-delay.attempts: 3
    restart-strategy.fixed-delay.delay: 10 s
    
    val env = ExecutionEnvironment.getExecutionEnvironment()
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
      3, // number of restart attempts
      Time.of(10, TimeUnit.SECONDS) // delay
    ))
    

    失败率重启策略(Failure Rate Restart Strategy)

    Job失败后会重启次数如果超过失败率,Job会最终被认定失败。在两个连续的重启尝试之间,重启策略会等待一个固定的时间。

    配置参数 描述 默认值
    restart-strategy.failure-rate.max-failures-per-interval Flink尝试执行的次数 1
    restart-strategy.failure-rate.failure-rate-interval 计算失败率的时间间隔 1 min
    restart-strategy.failure-rate.delay 两次重启之间等待的时间 akka.ask.timeout

    flink-conf.yaml参数配置:

    restart-strategy:failure-rate
    restart-strategy.failure-rate.max-failures-per-interval: 3 
    restart-strategy.failure-rate.failure-rate-interval: 5 min 
    restart-strategy.failure-rate.delay: 10 s
    
    val env = ExecutionEnvironment.getExecutionEnvironment() 
    env.setRestartStrategy(RestartStrategies.failureRateRestart( 
      3, // 每个测量时间间隔最大失败次数 
      Time.of(5, TimeUnit.MINUTES), //失败率测量的时间间隔 
      Time.of(10, TimeUnit.SECONDS) // 两次连续重启尝试的时间间隔 
    ))
    

    无重启策略(No Restart Strategy)

    Job直接失败,不会尝试进行重启

    flink-conf.yaml参数配置:

    restart-strategy: none
    
    val env = ExecutionEnvironment.getExecutionEnvironment()
    env.setRestartStrategy(RestartStrategies.noRestart())
    

    Reference:
    https://ci.apache.org/projects/flink/flink-docs-release-1.6/ops/state/savepoints.html
    https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/restart_strategies.html

    相关文章

      网友评论

        本文标题:Flink DataStream 状态和容错 三:Savepoi

        本文链接:https://www.haomeiwen.com/subject/qxaddqtx.html