Flink Checkpoint

作者: 写Bug的张小天 | 来源:发表于2017-06-19 19:50 被阅读701次

    Flink中的每个函数和算子都可以是有状态的(详情请看Working With State),有状态的函数通过处理各个元素或者事件来存储数据,使得State称为任何更复杂的操作类型的关键构件。

    为了使State容错,Flink需要checkpoint State,checkpoint允许Flink恢复流中的状态和位置,使应用程序具有与无故障执行相同的语义。

    流容错文档描述了流容错机制背后的详细技术信息.

    前提

    Flink的checkpoint机制与流和State的持久存储相互作用,通常需要:
    一个可以在一定时间内重放的数据源,如持久消息队列(如:Apache Kafka, RabbitMQ, Amazon Kinesis, Google PubSub)或者文件系统(如:HDFS, S3, GFS, NFS, Ceph, ...)
    用于存储state的持久存储系统,通常是一个分布式文件系统(如:HDFS, S3, GFS, NFS, Ceph, ...)

    启用和配置Checkpoint

    默认情况下checkpoint是不启用的,为了启用checkpoint,需要在StreamExecutionEnvironment中调用enableCheckpointing(n)方法, 其中n是checkpoint的间隔毫秒数。
    其他checkpoint参数包括:
      exactly-once vs at-least-once:你可以选择性地传入一个模式到enableCheckpointing(n)方法中来从两种保证层次中选择一种, 大多数程序倾向于选择Exactly-once, 而At-least-once倾向于某些低延迟应用程序(如:几毫秒)的选择。
      checkpoint timeout:过了这个时间之后还在处理的checkpoint如果还未完成的话将丢弃
      minimum time between checkpoints:为了确保流应用程序在检查点之间进行一定程度的进度,可以定义检查点之间需要多少时间。如果设置为5000,下一个checkpoint并不会在前一个checkpoint后的5秒前开始,无论checkpoint周期和checkpoint间隔。注意:这也就以为这checkpoint间隔将永远不会小于这个参数。
    通常定义checkpoint之间的时间比定义checkpoint间隔更简单,因为checkpoint之间的时间不容易受到checkpoint有时比平均时长更长的事实的影响(例如:目标存储系统暂时性缓慢)。
    注意:这个值还意味着当前的checkpoint数为1
      number of concurrent checkpoints:默认情况下,当还有一个checkpoint还在处理中时,不会触发另外一个checkpoint。这保证了Topology不会花费更多的时间在checkpoint上,以及不在处理stream中进行checkpoint。当然也可以允许多个重叠的checkpoint,这对于具有一定处理延迟的管道是有意义ed(例如:因为函数调用需要一些时间响应外部服务),但任然需要做非常频繁的checkpoint(100毫秒)来重新处理很少的故障。
    当定义checkpoint之间的最短时间时,不能使用此选项。

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    // start a checkpoint every 1000 ms
    env.enableCheckpointing(1000);
    
    // advanced options:
    
    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    
    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    
    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    val env = StreamExecutionEnvironment.getExecutionEnvironment()
    
    // start a checkpoint every 1000 ms
    env.enableCheckpointing(1000)
    
    // advanced options:
    
    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    
    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500)
    
    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig.setCheckpointTimeout(60000)
    
    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    

    选择一个State后端

    Flink的checkpoint机制在定时器和有状态的算子中保存了所有State的一致性快照,包括Connectors、windows以及任何用户自定义state。checkpoint保存在哪(如:JobManager内存、文件系统、数据库)取决于配置的State Backend
    默认情况下,State保存在TaskManager的内存中而checkpoint保存在JobManager的内存中,为了适当的保存大型State,Flink支持各种方法来在其他State后端存储和checkpoint State。State后端的选择可以通过StreamExecutionEnvironment.setStateBackend(...)来配置。

    迭代型作业的State checkpoint

    Flink目前只为非迭代型作业提供处理保证,在迭代型作业中启用checkpoint会引起一个异常,为了在一个迭代型作业中强制checkpoint,用户需要在启用checkpoint时设置一个特殊的标志:
    env.enableCheckpointing(interval, force = true)
    请注意:在故障期间,环路边缘的飞行记录(以及与之相关的State修改)将会丢失。

    重启策略

    Flink支持不同的重启策略,这些策略控制了失败作业如何重启,想了解更多请参考:重启策略

    相关文章

      网友评论

        本文标题:Flink Checkpoint

        本文链接:https://www.haomeiwen.com/subject/qspmqxtx.html