美文网首页程序员flink
Flink详解系列之八--Checkpoint和Savepoin

Flink详解系列之八--Checkpoint和Savepoin

作者: 王吉吉real | 来源:发表于2021-02-15 23:57 被阅读0次

    一、Checkpoint

    获取分布式数据流和算子状态的一致性快照是Flink容错机制的核心,这些快照在Flink作业恢复时作为一致性检查点存在。

    1.1 原理

    1.1.1 Barriers
    Barrier是由流数据源(stream source)注入数据流中,并作为数据流的一部分与数据记录一起往下游流动。Barriers将流里的记录分隔为一段一段的记录集,每一个记录集都对应一个快照。每个Barrier会携带一个快照的ID,这个快照对应Barrier前面的记录集。如下图所示。


    当一个算子从所有输入流都接收到一个快照(n)的barrier时,它首先会生成该算子的状态快照,然后往该算子的所有下游广播一个barrier。这个算子是sink算子时,它会告知检查点的 coordinator(一般是flink的jobManager),当所有sink算子都告知接收到了一个快照的barrier时,该快照生成结束。

    1.1.2 对齐检查点(aligned Checkpoint)
    当一个算子接收到多于一个输入流时,就需要进行这些流的barrier对齐。当一个算子接收到第一个输入流的快照barrier n时,它不能继续处理该流的其他数据,而是需要等待接收到最后一个流的barrier n,才可以生成算子的状态快照和发送挂起的输出记录,然后发送快照barrier n。否则,检查点快照n跟检查点n+1就会混淆。


    检查点对齐保证了状态的准确性,但由于对齐操作是阻塞式的,会使检查点生成时长不可控,降低吞吐量,当作业出现反压时,会加剧反压,甚至导致作业不稳定等问题。

    1.1.3 非对齐检查点(Unaligned Checkpoint)
    为规避上述风险,从Flink 1.11开始,检查点也可以是非对齐的。具体方法比较类似于Chandy-Lamport算法,但Flink仍会在数据源插入barrier来避免检查点coordinator负载过重。


    具体处理过程是这样的:算子在接收到第一个数据流的barrier n时就立即开始生成快照,将该barrier发往下游,将其他流中后续到来的该快照的记录进行异步存储,并创建各自的状态快照。

    非对齐检查点可以保证barrier尽快到达sink, 非常适合算子拓扑中至少有一条缓慢路径的场景。然而,由于会增大I/O压力,如果写入状态后端是处理瓶颈的话,使用非对齐检查点就不太合适了。

    1.2 使用配置

    检查点的配置有许多项,下面就介绍几个比较常用的重要配置。
    1.2.1 检查点保留策略
    检查点在默认不保留,而且只用于作业失败时的恢复。程序取消时,检查点会被删除。但可以通过配置周期性的保留检查点,由于这些配置,当作业失败或者取消时,保留的检查点不会被自动清除。这样作业失败时,就有一个可以用于恢复的检查点了。

    CheckpointConfig config = env.getCheckpointConfig();
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    

    ExternalizedCheckpointCleanup配置了作业取消时检查点是保留还是删除。

    • ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 取消作业时保留检查点。请注意,在这种情况下,您必须在取消后手动清理检查点状态。
    • ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 取消作业时删除检查点。只有在作业失败时,检查点状态才可用。

    1.2.2 目录配置
    检查点保存的信息包括元数据文件和数据文件,可以通过配置文件配置, 并且是全局性的,对所有作业适用。

    state.checkpoints.dir: hdfs:///checkpoints/
    

    在使用状态后端FsStateBackend或者RocksDBStateBackend时,可以指定相关作业的检查点保存目录

    env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/"));
    

    1.2.3 模式配置
    Checkpoint支持两种模式:exactly-once和at-least-once, 默认模式为exactly-once。

    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    • Exactly Once:保证每条数据对于 Flink 的状态结果只影响一次。
    • At Least Once:每条数据对于 Flink 状态计算至少影响一次。

    1.2.4 backend选择
    关于backend配置及选择可以参考上一篇Flink详解之七--状态管理

    1.3 非对齐检查点(unaligned checkpoint)

    二、Savepoint

    savepoint是使用检查点机制创建的,作业执行状态的全局镜像,可用于flink的停止与恢复,升级等。savepoint有两部分构成:一是在稳定存储(如:HDFS、S3等)中保存了二进制文件的目录,二是元数据文件。这些文件表示了作业执行状态的镜像,其中元数据文件主要保存了以绝对路径表示的指针。

    savepoint文件默认保存目录配置

    # Default savepoint target directory
    state.savepoints.dir: hdfs:///flink/savepoints
    
    1.1 使用
    # 触发savepoint
    $ bin/flink savepoint :jobId [:targetDirectory]
    #删除savepoint
    $ bin/flink cancel -s [:targetDirectory] :jobId
    #从savepoint恢复
    $ bin/flink run -s :savepointPath [:runArgs]
    #删除savepoint
    $ bin/flink savepoint -d :savepointPath
    
    1.2 配置算子ID

    使用savepoint进行恢复时,是根据算子ID来匹配算子状态在savepoint中的存储位置的。官方文档强烈建议给每个算子手动配置一个算子ID,这个ID可以通过uid(String)方法配置。

    当没有手动配置时,程序会根据算子在的程序算子拓扑中的位置生成一个ID。如果程序没有改变,是可以从savepoint中恢复的,但如果程序改变了,同一个算子在程序中的位置也就改变了,相应的算子ID也会变化,就无法从之前的savepoint恢复了。

    DataStream<String> stream = env.
      // Stateful source (e.g. Kafka) with ID
      .addSource(new StatefulSource())
      .uid("source-id") // ID for the source operator
      .shuffle()
      // Stateful mapper with ID
      .map(new StatefulMapper())
      .uid("mapper-id") // ID for the mapper
      // Stateless printing sink
      .print(); // Auto-generated ID
    

    由于有些算子是有状态的,有些算子是无状态的,实际上只要给有状态的算子添加算子ID就可以,但很多人并不太清楚哪些算子是有状态的,哪些是无状态的,所以,实际操作中,尽量给每个算子手动配置算子ID。

    三、Checkpoint与Savepoint的异同

    3.1 相同点
    • 都用于作业的恢复
    • 创建时都使用ckeckpoint机制,使用相同的一套代码和数据格式
    3.2 不同点
    • 设计目的不同: checkpoint是作为Flink作业容错机制存在的,用于作业潜在失败时的恢复,savepoint是作为需要作业重启时(比如:Flink版本升级、作业拓扑改变,并行度修改,作业A/B测试等场景)保存状态并恢复的一种机制。
    • 生命周期不同: checkpoint的生命周期由flink来管理,flink负责checkpoint的创建、维护和释放,过程中没有与用户交互。savepoint就不同了,它是由用户来创建、维护和删除的,savepoint的是事先规划好的、手动备份并用于恢复。
    • 具体实现不同: checkpoint作为用于恢复,需要定期触发并保存状态的机制。实现上需要满足两点:1)创建时尽量轻量级,2)恢复时越快越好。savepoint在创建和恢复时比checkpoint更重一些,更偏重于便捷性及对作业前述改动的支持。

    相关文章

      网友评论

        本文标题:Flink详解系列之八--Checkpoint和Savepoin

        本文链接:https://www.haomeiwen.com/subject/oiotxltx.html