Flink 使用介绍相关文档目录
前言
本篇接Flink 使用之配置与调优。重点整理Flink checkpoint相关的配置。
StreamExecutionEnvironment
通过Flink的StreamExecutionEnvironment
可以配置Checkpoint的间隔时间和state backend。有如下配置项:
- enableCheckpointing:启用checkpoint,必须指定checkpoint间隔时间。可选指定CheckpointMode。实际上是间接调用的CheckpointConfig的方法。
- setStateBackend:配置StateBackend。支持的有HashMapStateBackend和RocksDBStateBackend。生产环境建议使用RocksDBStateBackend。
其他更为细节的配置方式需要通过CheckpointConfig
来完成。
CheckpointConfig
CheckpointConfig
支持更为细节的checkpoint配置。我们可以通过env.getCheckpointConfig()
来获取CheckpointConfig。
CheckpointConfig
支持如下配置:
- disableCheckpointing: 禁用checkpoint(设置interval为-1)。
- setCheckpointingMode:设置checkpoint模式。CheckpointMode包含两种:
EXACTLY_ONCE
和AT_LEAST_ONCE
。EXACTLY_ONCE
保证在operator恢复的过程中,流数据不重复也不丢失。EXACTLY_ONCE
不意味着数据仅仅流转一次(会涉及到数据源的replay等)。这个模式也不能保证Flink对外部系统的精准一次投送。需要外部系统的支持和额外配置。该模式支持高吞吐量,但是可能会提高延迟,因为多数据输入源的情况下需要对齐(align)。AT_LEAST_ONCE
只能保证数据不丢失,但在故障恢复之后,部分数据有可能重复。这个模式对延迟的影响微乎其微。例子可以参考:Flink 使用之Kafka exactly-once场景。 - setCheckpointInterval:启用checkpoint并设置checkpoint的时间间隔。时间间隔最小值为10ms。
- setCheckpointTimeout:设置checkpoint的超时时间。即每次checkpoint过程可容忍的最大耗费时间。
- setMinPauseBetweenCheckpoints:设置两次checkpoint之间的最小间隔时间。有时候checkpoint耗时可能过长,checkpoint的间隔时间配置的又比较短的话,可能导致checkpoint持续进行或者是占用很高的时间比例。该配置可以缓解这种情形。默认值为0。
- setMaxConcurrentCheckpoints:最多有多少个checkpoint操作可同时触发,默认为1。
- setTolerableCheckpointFailureNumber:配置可容忍的checkpoint失败次数。默认为0不容忍任何失败。
- enableExternalizedCheckpoints:checkpoint保存在外部存储系统。参数
ExternalizedCheckpointCleanup
包含两种模式:DELETE_ON_CANCELLATION
(作业cancel的时候自动删除所有checkpoint状态)和RETAIN_ON_CANCELLATION
(作业取消的时候保留checkpoint状态)。 - enableUnalignedCheckpoints:启用Unaligned checkpoint。只有在checkpointMode配置为
EXACTLY_ONCE
的时候才可以启用。Unaligned checkpoint详情请参考:Flink 源码之 1.11新特性Unaligned checkpoint。 - setAlignmentTimeout:checkpoint align超时时间。只有在启用Unaligned checkpoint时候生效。默认是0不启用。如果启用的话,checkpoint默认为aligned checkpoint,如果checkpoint启动延迟时间超过配置时间,会自动转换为unaligned checkpoint模式。
- setCheckpointStorage:配置checkpoint存储。可配置文件系统路径等。
flink-conf.yaml
Flink checkpoint还能够通过flink-conf.yaml
文件配置。除了和代码中相同功能的配置项外,配置文件还有一些独有的配置。
可供使用的配置项如下:
- state.backend.incremental:是否启用增量checkpoint。默认为false。目前rocksdb支持增量checkpoint。和全量checkpoint相比,增量checkpoint每次只保存和上次checkpoint不同的地方。可以显著减小每次checkpoint写入的数据量,减少checkpoint的时耗时。
- state.backend.local-recovery:是否启用本地恢复。默认为false。启用该配置可以加快作业的故障恢复速度。目前MemoryStateBackend 不支持本地恢复。
- state.checkpoint-storage:配置checkpoint存储类型。和前面的
setCheckpointStorage
类似。 - state.checkpoints.dir:配置checkpoint的存储路径。和前一个相似,建议使用该配置项。
- state.checkpoints.num-retained:最多保留成功的checkpoint的个数。默认为1。
- state.savepoints.dir:savepoint的默认保存目录。
- state.storage.fs.memory-threshold:最小的状态文件大小。小于这个大小的state会被存储到root checkpoint元数据文件。默认发为20kb。
- state.storage.fs.write-buffer-size:写入checkpoint的默认写缓存大小。实际上checkpoint写缓存大小不会超过
state.storage.fs.memory-threshold
配置值。 - taskmanager.state.local.root-dirs:本地恢复状态的保存路径,为本地磁盘路径。
网友评论