美文网首页flinkFlink实践
关于checkpoint在flink生产的应用

关于checkpoint在flink生产的应用

作者: 神奇的考拉 | 来源:发表于2019-05-28 21:54 被阅读0次

    一.简述

    Flink本身为了保证其高可用的特性,以及保证作用的Exactly Once的快速恢复,进而提供了一套强大的checkpoint机制。但是在实际应用中由于对checkpoint的使用不当会带来不恰当的影响:比如两次checkpoint的间隔太短,导致应用一直处于checkpoint的状态下,甚至会导致整个应用变得不可用。接下来会讨论下checkpoint相关内容以及优化参数参考

    二.checkpoint是否合理参考参数

    对checkpoint进行优化,我们需要参考对应的metrics:

    • Checkpoint间隔时间:
      比对前后两次checkpoint的开始时间,是否存在间隔?有则代表当前checkpoint设置时间比较合理。
    • 数据Buffered大小:
      关于buffered主要是为了flink处理过程会存在一些慢数据流的stream barriers而设计的,通过该参数可以参考当前flink处理流慢数据的比例


      checkpoint参数

      接下来看看如何合理设置相关的内容

    2.1 Checkpoint间隔时间

    在实际应用情况下,面对超大数据集规模,每次checkpoint的时间都超过我们设定的或系统的时间,结果会如何?
    那就是应用会一直处于checkpoint,甚至导致整个应用都变得不可用了。面对该情况我们提供的方案比如:
    1.设置并行checkpoint数 ???
    2.增量checkpoint:每次只checkpoint出对前一次checkpoint内的状态数据的增量改动。然后恢复的时候做状态改动的重放???
    这里我们来说下第三种方案:强制设置两次checkpoint的空闲间隔


    checkpoint的间隔

    通过flink提供的config参数来控制,通过该方法我们就可以控制前后checkpoint的间隔不会导致应用一直处于checkpoint。

    getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
    

    该参数并未没有彻底解决大规模状态集下checkpoint慢的问题,只是降低慢带来的风险和影响,接下来看看如果解决大规模数据集下的“慢”问题本质方案

    2.2 外部state的存储

    一般来说checkpoint之所以慢 还是因为数据规模大,那如果我们能找到一种更快的存储状态的介质(或者策略),来使得这个过程变快。比如可以选择更加高效的外部存储介质来做State的存储(比如RocksDB),而不仅限于存储于有限的内存空间里,甚至完全落地到磁盘上。

    2.2.1 资源设置

    由于checkpoint是在每个task上先做数据checkpoint,然后在外部存储中做checkpoint持久化。在总状态数据相对固定的情况下,若是减少每个task平均所checkpoint的数据,那么相应地checkpoint的总时间也会变短。所以为每个task设置更多的并行度来加速checkpoint的执行过程。
    例如2000W的数据设定100个parallelism,平均=2000W/100;若是将parallelism增大变成200,则平均=2000W/200,相对每份需要处理的数据比较小些,处理的时长就会变少

    2.2.2 task恢复

    由于checkpoint是分散在每个task上执行,再做汇总持久化。这些task做的checkpoint数据在后面应用恢复时包括并行度扩增或减少时能够重新打散分布。
    那么每个task会为了支持快速恢复,会同时写checkpoint数据到本地磁盘和远程分布式存储,只要task本地的checkpoint数据没有被破坏,系统在应用恢复时会优先加载本地的checkpoint数据,这样就大大减少了远程拉取状态数据的过程。


    checkpoint task数据存储
    2.2.3 常见的配置参数
    // checkpoint周期
    env.enableCheckpointing(1000);
    // checkpoint mode
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    ​
    // checkpoint执行有效期:要么1min完成 要么1min放弃
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    ​
    // 确保checkpoint时间空闲间隔500ms
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    ​
    // 允许同一时间只存在一个checkpoint
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    ​
    // job cancellation启用保留的外部检查点
    env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    ​
    // This determines if a task will be failed if an error occurs in the execution of the task’s checkpoint procedure.
    env.getCheckpointConfig().setFailOnCheckpointingErrors(true);
    
    • 使用enableCheckpointing方法来设置开启checkpoint;
      可以使用enableCheckpointing(long interval)或enableCheckpointing(long interval, CheckpointingMode mode):
      interval用于指定checkpoint的触发间隔(单位milliseconds);

    • CheckpointingMode默认是CheckpointingMode.EXACTLY_ONCE,也可以指定为CheckpointingMode.AT_LEAST_ONCE或者getCheckpointConfig().setCheckpointingMode来设置CheckpointingMode,一般对于超低延迟的应用(大概几毫秒)可以使用CheckpointingMode.AT_LEAST_ONCE,其他大部分应用使用CheckpointingMode.EXACTLY_ONCE就可以

    • checkpointTimeout用于指定checkpoint执行的超时时间(单位milliseconds),超时没完成就会被abort掉

    • minPauseBetweenCheckpoints用于指定checkpoint coordinator上一个checkpoint完成之后最小等多久可以出发另一个checkpoint,当指定这个参数时,maxConcurrentCheckpoints的值为1

    • maxConcurrentCheckpoints用于指定运行中的checkpoint最多可以有多少个,用于包装topology不会花太多的时间在checkpoints上面;如果有设置了minPauseBetweenCheckpoints,则maxConcurrentCheckpoints这个参数就不起作用了(大于1的值不起作用)

    • enableExternalizedCheckpoints用于开启checkpoint的外部持久化,但是在job失败的时候不会自动清理,需要自己手工清理state;ExternalizedCheckpointCleanup用于指定当job canceled的时候externalized checkpoint该如何清理,DELETE_ON_CANCELLATION的话,在job canceled的时候会自动删除externalized state,但是如果是FAILED的状态则会保留;RETAIN_ON_CANCELLATION则在job canceled的时候会保留externalized checkpoint state

    • failOnCheckpointingErrors用于指定在checkpoint发生异常的时候,是否应该fail该task,默认为true,如果设置为false,则task会拒绝checkpoint然后继续运行

    相关文章

      网友评论

        本文标题:关于checkpoint在flink生产的应用

        本文链接:https://www.haomeiwen.com/subject/ifbjtctx.html