美文网首页Flink精选学习
【Flink 精选】如何优化大状态作业?

【Flink 精选】如何优化大状态作业?

作者: 熊本极客 | 来源:发表于2020-09-21 18:15 被阅读0次

    本文从监控、参数调整、资源规划等方面,优化 Flink 大状态作业的常见问题。


    背景:Flink 有状态作业能可靠地运行,必须满足以下两个条件 ① 作业必须能获得性能良好的 Checkpoint;② 发生故障后,作业的处理速度能*赶上输入数据流

    1 监控 Checkpoint 和 State

    Flink Web UI 提供了 Checkpoint 监控,其官方文档介绍了如何使用 Checkpoint 监控页面。
    Checkpoint 有2个关键指标:
    subtask checkpoint 延迟时间 = end_to_end_duration - synchronous_duration - asynchronous_duration
    barrier 对齐的缓存数据量。对于Exactly-Once语义,算子 Operator 从输入流接收到 barrier n 后,它就不能处理来自该流的任何数据记录,直到它从其他所有输入接收到 barrier n。

    2 调整 Checkpoint 参数

    设置Checkpoint 的时间间隔,可以定时产生 barrier 发送下游。如果完成 Checkpoint 所需的时间大于 Checkpoint 发生的时间间隔,则在进行中的 Checkpoint 完成之前不会触发下一个检查点。(默认情况下,正在进行的 Checkpoint 完成后,才会触发下一个 Checkpoint)

    异常场景:
    如果 Checkpoint 持续的时间经常大于 Checkpoint 的时间间隔,系统会不断进行 Checkpoint(一旦完成 Checkpoint,则立即启动新的 Checkpoint) 。这可能意味着 Flink 的资源经常被用于 Checkpoint,影响算子 Operator 正常处理业务数据。该情况对使用异步 Checkpoint 的作业较小,但影响作业的整体性能。

    为了避免该情况,作业可以自定义 Checkpoint 的最小暂停时间。

    // 该暂停时间是 Checkpoint n 结束时间与下一个 Checkpoint n+1 开始时间之前, 需要暂停的最小时间
    StreamExecutionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(milliseconds)
    
    
    checkpoint的最小暂停时间.png

    3 调整 RocksDB

    3.1 增量 Checkpoint

    如果 StateBackend 是 RocksDB,可以选择使用增量 Checkpoint。对比全量Checkpoint,增量 Checkpoint 可以大大减少 Checkpoint 时间,因为增量 Checkpoint 仅需要保存与上一次 Checkpoint 相比的更改部分,不保存 StateBackend 的完整独立备份。

    说明:
    增量 Checkpoint 是利用 RocksDB 的内部压缩机制。该机制可以随着时间的推移>进行自动整合,导致增量 Checkpoint 的历史记录不会无限期增长,而是最终自动合并和清除旧的 Checkpoint

    // 用户需要手动开启增量 Checkpoint 功能
    // 第一种方式 flink-conf.yaml
    state.backend.incremental: true   // 启动增量 Checkpoint
    
    // 第二种方式代码修改
    RocksDBStateBackend backend = new RocksDBStateBackend(checkpointDirURI, true);
    
    // 注意如果启动增量 Checkpoint功能,Web UI 显示的 Checkpointed Data Size 是指增量 Checkpoint 的数据大小。
    
    

    3.2 RocksDB 和 JVM heap 的定时器

    Flink Timer 定时器用于处理 eventTime 或者 processTime,例如触发窗口、回调ProcessFunction
    如果 StateBackend 是 RocksDB,则 Timer 定时器默认也是储存在 RocksDB。这可以提高可靠性和可扩展性,但 RocksDB 维护 Timer 需要耗费一定的资源。Flink 还提供了将 Timer 储存在 JVM heap。

    // 不是默认值 rocksdb
    state.backend.rocksdb.timer-service.factory设置为heap
    

    注意基于 RocksDB 和 JVM heap 的 Timer 定时器都不支持异步快照。

    3.3 调整 RocksDB 的内存参数

    (1)RocksDB 的基本读写操作

    RocksDB 写入操作就是把数据写入到内存的 MemTable 中。当 MemTable 写满时,转换成为 READ ONLY MemTable,并被一个新申请的 MemTable 替换。只读 MemTable 被后台线程周期性地刷新到磁盘中,生成按键排序的只读文件,即 SSTables。SSTable 是不可变的,通过后台的多路归并实现进一步的整合。 RocksDB 基本读写操作.jpg

    (2)RocksDB 内存的3种配置

    ① block_cache_size 的配置
    控制内存中缓存的最大未压缩块数。随着块数的不断增加,内存大小也会增加。因此,通过预先配置可以保持固定的内存消耗水平
    ② write_buffer_size 的配置
    控制着 RocksDB 中 MemTable 的最大值。活跃 MemTables 和只读的 MemTables 最终会影响 RocksDB 中的内存大小。
    ③ max_write_buffer_number 的配置
    在 RocksDB 将 MemTables 导出到磁盘上的 SSTable 之前,此配置决定并控制着内存中保留的 MemTables 的最大数量,即内存中“只读内存表“的最大数量。

    4 规划资源

    资源规划的基本经验是先满足正常,再计算额外资源

    4.1 确定正常运行的资源

    计算作业能正常运行的资源,其标准是在一定的背压下,作业无法处理业务。如何监控背压,参考【Flink 精选】如何分析及处理反压?

    4.2 正常运行的程序提供一些额外的资源

    该资源用于“追赶”作业在 Checkpoint 恢复期间累计的数据,取决于恢复的时长以及速度。

    说明:
    Ⅰ短暂背压是正常的,同时在负载峰值,追赶阶段表现出暂时的减速期间也是正常的。
    Ⅱ 应该在 Checkpoint 开始的时候建立基线,因为 Checkpoint 会占用一些资源,例如网络I/O。

    4.3 特殊操作

    特殊操作可以提前预算资源。例如,大窗口会给下游操作员带来不菲的负担,在窗口触发时,下游算子有很大的负担。下游并行性的计划需要考虑到窗口触发时的数据量和处理速率。

    注意:
    由于最大并发度 maxParallelism 涉及Flink State 的缩放,决定作业的并发度上限并且一旦确定就不能修改,因此应该结合业务场景主动为每个作业设置合理的 maxParallelism。(Flink 内部使用 KeyGroup 机制,当恢复缓存数据时,可以最大概率地从本地数据中恢复,参考:【Flink 状态】state 缩放

    5 压缩状态数据

    Checkpoint 和 Savepoint 具有压缩功能(默认值:关闭),使用 snappy 压缩工具。

    // 压缩可以通过激活ExecutionConfig
    ExecutionConfig executionConfig = new ExecutionConfig();
    executionConfig.setUseSnapshotCompression(true);
    

    6 作业恢复状态

    6.1 不同 StateBackend 的本地恢复

    当前 StateBackend 的本地恢复仅针对 keyed state,一般情况 keyed state 是最大的缓存数据。

    6.1.1 FsStateBackend 的本地恢复

    keyed state 支持 FsStateBackend 的本地恢复。该实现会将复制状态到本地文件,导致额外的 I/O 并占用本地磁盘空间。

    6.1.2 RocksDBStateBackend 的本地恢复

    keyed state 支持 RocksDBStateBackend 的本地恢复。
    对于全量 Checkpoint,保存状态数据到本地文件,导致额外的 I/O 并占用本地磁盘空间。
    对于增量 Checkpoint,本地状态是基于 RocksDB 的本机 Checkpoint 机制,该机制首先是创建主副本,意味着创建第二副本不会引入额外的成本。

    具体流程
    Ⅰ首先保留本地 Checkpoint 目录,本地副本与 RocksDB 共享目录(硬链接);
    Ⅱ 接着对于活跃的文件,使用增量 Checkpoint 进行本地恢复。(注意:硬链接意味着 RocksDB 目录与可存储状态数据的本地文件目录必须在同一物理设备上。)

    相关文章

      网友评论

        本文标题:【Flink 精选】如何优化大状态作业?

        本文链接:https://www.haomeiwen.com/subject/qezjyktx.html