美文网首页Spark & FlinkFlink专题
Flink DataStream 状态和容错 二:Checkpo

Flink DataStream 状态和容错 二:Checkpo

作者: Alex90 | 来源:发表于2019-01-11 23:26 被阅读9次

    Checkpoint

    Flink 中的 State 在上一篇中介绍过,为了使 State 容错,需要有 State checkpoint(状态检查点)。Checkpoint 允许 Flink 恢复流的 State 和处理位置,从而为程序提供与无故障执行相同的语义。Checkpoint 机制在 Flink 容错机制 中有更详细介绍。

    Checkpoint 使用的先决条件:

    1. 一个持久化的,能够在一定时间范围内重放记录的数据源。例如,持久化消息队列:Apache Kafka,RabbitMQ,Amazon Kinesis,Google PubSub 或文件系统:HDFS,S3,GFS,NFS,Ceph...
    2. State 持久化存储系统,通常是分布式文件系统:HDFS,S3,GFS,NFS,Ceph...

    启用和配置

    Checkpoint 默认情况下是不启用的。StreamExecutionEnvironment 对象调用 enableCheckpointing(n) 启用 Checkpoint,其中n是以毫秒为单位的 Checkpoint 间隔。

    Checkpoint 的配置项包括:

    • 恰好一次(exactly-once)或至少一次(at-least-once):Checkpoint 支持这两种模式。对于大多数应用来说,恰好一次是优选的。至少一次可能在某些要求超低延迟(几毫秒)的应用程序使用。

    • Checkpoint 超时时间:在超时时间内 checkpoint 未完成,则中止正在进行的 checkpoint。

    • Checkpoint 最小间隔时间(毫秒):如果设置为5000,表示在上一个 checkpoint 完成后的至少5秒后才会启动下一个 checkpoint,不论 checkpoint 的持续时间和间隔是多少。即使 checkpoint 间隔永远不会小于此参数。是为了保证 checkpoint 之间能够完成一定量的数据处理工作。

      配置 time between checkpoint 相比配置 checkpoint interval 通常更容易。因为 checkpoint 耗时有时会明显比平时更长,time between checkpoint 更不容易收到影响(例如,目标存储系统临时性的响应缓慢)

      这个值还意味着并发 checkpoint 的数量是一个

    • Checkpoint 并发数:默认情况下,当一个 checkpoint 处于运行状态时,系统不会触发另一个 checkpoint。确保整个拓扑结构不会花费太多时间用于 checkpoint。该设置可以设置多个重叠的 checkpoint,特点的场景可能会需要。

      当设置 time between checkpoint 时,不能使用此配置。

    • 外部 checkpoint:可以配置在系统外部持久化 checkpoint。Checkpoint 信息写入外部持久存储,在作业失败时不会自动清除,因此作业失败时可以用来恢复。

    • Checkpoint 出错时,任务状态:决定了如果在 checkpoint 过程中发生错误,当前任务是否将失败或继续执行。默认会任务失败。

    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    
    // 启用 checkpoint 间隔 1000 ms
    env.enableCheckpointing(1000)
    
    // 高级选项:
    
    // 设置 exactly-once 模式
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    
    // 设置 checkpoint 最小间隔 500 ms 
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    
    // 设置 checkpoint 必须在1分钟内完成,否则会被丢弃
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    
    // 设置 checkpoint 失败时,任务不会 fail,该 checkpoint 会被丢弃
    env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)
    
    // 设置 checkpoint 的并发度为 1
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    

    相关配置

    更多相关参数可以通过 conf/flink-conf.yaml 全局配置

    配置项 默认值 描述
    state.backend (none) 选择 state backend 实现
    state.backend.async true state backend 使用异步方法。有些不支持异步,或者仅支持异步的可并忽略此选项
    state.backend.fs.memory-threshold 1024 存储 state 数据文件的最小规模,如果小于该值则会存储在 root checkpoint metadata file
    state.backend.incremental false 是否采用增量 checkpoint,有些不支持增量的可并忽略此选项
    state.backend.local-recovery false
    state.checkpoints.dir (none) 用于指定 checkpoint 数据存储目录,目录必须对所有参与的 TaskManagers 和 JobManagers 可见
    state.checkpoints.num-retained 1 指定保留已完成的 checkpoint 数量
    state.savepoints.dir (none) 用于指定 savepoint 数据存储目录
    taskmanager.state.local.root-dirs (none)

    选择 State backend

    Checkpoint 的存储的位置取决于配置的 State backend(JobManager 内存,文件系统,数据库...)。

    默认情况下,State 存储在 TaskManager 内存中,Checkpoint 存储在 JobManager 内存中。Flink 支持在其他 state backend 中存储 State 和 Checkpoint。可以通过如下方法配置:StreamExecutionEnvironment.setStateBackend(…),下面有更详细的介绍。

    迭代任务中使用

    Flink 目前仅为没有迭代的作业提供处理保证。在迭代作业上启用 checkpoint 会导致异常。为了强制对迭代程序执行 checkpoing,需要设置一个特殊标志:env.enableCheckpointing(interval, force = true)

    在失败期间,处在循环边界的记录(以及与相关的 State 变化)将丢失。

    State backend

    Flink 提供了不同的 State backend,支持不通的 State 存储方式和位置。默认会使用配置文件 flink-conf.yaml 指定的选项,也可以在每个作业中设置来覆盖默认选项:

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(...);
    

    Flink 自带了以下几种开箱即用的 state backend:

    • MemoryStateBackend
    • FsStateBackend
    • RocksDBStateBackend

    在没有配置的情况下,系统默认使用 MemoryStateBackend

    三种 State backend 介绍

    MemoryStateBackend

    使用 MemoryStateBackend,在 checkpoint 中对 State 做一次快照,并在向 JobManager 发送 checkpoint 确认完成的消息中带上此快照数据,然后快照就会存储在 JobManager 的内存堆中。

    MemoryStateBackend 的限制:

    • 单个 State 的大小默认限制为5MB,可以在 MemoryStateBackend 的构造函数中增加。
    • 不论如何配置,State 大小都无法大于 akka.framesize(JobManager 和 TaskManager 之间发送的最大消息的大小)
    • JobManager 必须有足够的内存大小

    MemoryStateBackend 适用以下场景:

    • 本地开发和调试
    • 只持有很小的状态,如方法:Map、FlatMap、Filter... 或 Kafka Consumer

    FsStateBackend

    FsStateBackend 需要配置一个文件系统的URL来,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。

    FsStateBackend 在 TaskManager 的内存中持有正在处理的数据。Checkpoint 时将 state snapshot 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。

    FsStateBackend 默认是异步操作,以避免在写 state snapshot 时阻塞处理程序。如果要禁用异步,可以在 FsStateBackend 构造函数中设置:

    new FsStateBackend(path, false);
    

    FsStateBackend 适用以下场景:

    • State 较大,窗口时间较长和 key/value 较大的 State
    • 所有高可用性的情况

    RocksDBStateBackend

    RocksDBStateBackend 需要配置一个文件系统的URL来,如 "hdfs://namenode:40010/flink/checkpoint" 或 "file:///data/flink/checkpoints"。

    RocksDBStateBackend 在 RocksDB 中持有正在处理的数据,RocksDB 在 TaskManager 的数据目录下。Checkpoint 时将整个 RocksDB 写入文件系统目录下的文件中,文件的路径会传递给 JobManager,存在其内存中。

    RocksDBStateBackend 通常也是异步的。

    RocksDBStateBackend 的限制:
    RocksDB JNI API 是基于 byte[],因此 key 和 value 最大支持大小为2^31 个字节。RocksDB 自身在支持较大 value 时候有一些问题。

    RocksDBStateBackendFsStateBackend 同样适用以下场景:

    • State 较大,窗口时间较长和 key/value 较大的 State
    • 所有高可用性的情况
    • 目前唯一支持增量 checkpoint

    与前两者相比(处理状态下的 State 还是保存在内存中),使用 RocksDB 可以保存的状态量仅受可用磁盘空间量的限制。这也意味着可以实现的最大吞吐量更低,后台的所有读/写都必须通过序列化和反序列化来检索/存储 State,这也比使用基于堆内存的方式代价更昂贵。

    性能比较

    Flink 支持 Standalone 和 on Yarn 的集群部署模式,以 Windowed Word Count 处理为例测试三种 State backends 在不通集群部署上的性能差异(来源:美团 Flink _Benchmark

    Standalone 时的存储路径为 JobManager 上的一个文件目录,on Yarn 时存储路径为 HDFS 上一个文件目录。

    不同 State backend 吞吐量对比

    Throughput
    • 使用 FileSystem 和 Memory 的吞吐差异不大(都是使用堆内存管理处理中的数据),使用 RocksDB 的吞吐差距明显。
    • Standalone 和 on Yarn 的总体差异不大,使用 FileSystem 和 Memory 时 on Yarn 模式下吞吐稍高,相反的使用 RocksDB 时 Standalone 模式下的吞吐稍高。

    不同 State backend 延迟对比

    Latency
    • 使用 FileSystem 和 Memory 时延迟基本一致且较低。
    • 使用 RocksDB 时延迟稍高,且由于吞吐较低,在达到吞吐瓶颈附近延迟陡增。其中 on Yarn 模式下吞吐更低,延迟变化更加明显。

    State backend 的选择

    StateBackend in-flight checkpoint 吞吐 推荐使用场景
    MemoryStateBackend TM Memory JM Memory 调试、无状态或对数据丢失或重复无要求
    FsStateBackend TM Memory FS/HDFS 普通状态、窗口、KV 结构
    RocksDBStateBackend RocksDB on TM FS/HDFS 超大状态、超长窗口、大型 KV 结构

    Reference:
    https://flink.xskoo.com/dev/stream/state/checkpointing.html
    https://tech.meituan.com/Flink_Benchmark.html

    相关文章

      网友评论

        本文标题:Flink DataStream 状态和容错 二:Checkpo

        本文链接:https://www.haomeiwen.com/subject/sstqdqtx.html