现象
flink checkpoint 老是超时,反压很高
data:image/s3,"s3://crabby-images/285c4/285c47d909fafcb3fd02c3dea00b960781a1cca7" alt=""
data:image/s3,"s3://crabby-images/f8130/f8130a129b4c20acb4c7651ce3a852e537510a0f" alt=""
data:image/s3,"s3://crabby-images/c5429/c542975cd8567aa0e9d6eebcc3f2464c7520a97b" alt=""
问题分析
When the time to trigger the checkpoint is constantly very high, it means that the checkpoint barriers need a long time to travel from the source to the operators
当checkpoint 耗时非常长得时候,这意味着checkpoint barriers需要很长时间才能到达opreator
- 是什么导致checkpoint barriers需要很长时间才能到达opreator
- 可能某个task solt中算子报错,那么这个barrier就永远无法到达,那么checkpoint要想等待barrier对齐,结果是不可能,进而导致checkpoint失败
- 某个task solt中算子数据倾斜,等待很长时间
- 某个task solt中算子和外部存储系统交互时间很长
- 大状态state的存储
- 增加的网络缓冲区数量也导致增加了检查点时间,因为保持更多的飞行中数据意味着检查点障碍被延迟了
- 使用异步状态后端 rosckdb
- 当状态被异步快照时,检查点的扩展比状态被同步快照时更好。尤其是在具有多个联接,功能或窗口的更复杂的流应用程序中,这可能会产生深远的影响。The combination RocksDB state backend with heap-based timers currently does NOT support asynchronous snapshots for the timers state. Other state like keyed state is still snapshotted asynchronously
RocksDBStateBackend backend = new RocksDBStateBackend(filebackend, true)
- 压缩选项对增量快照没有影响,因为它们使用的是RocksDB的内部格式,该格式始终使用开箱即用的快速压缩,所以在选择rocksdb开启压缩,状态大小未发生变化
压缩前
data:image/s3,"s3://crabby-images/927f0/927f0ff665969dacb6bed101d75961f5b04e7f8b" alt=""
压缩后没有变化
data:image/s3,"s3://crabby-images/6c8e3/6c8e3a5f6f7475990438818dc977e9214b8eafa8" alt=""
9、观察到日志超时,也会引起checkpoint超时
最近遇到一个很奇怪的问题,Flink任务正常启动正常运行一段时间后就会报错,,错误详情如下
2021-09-01 13:48:31,802 ERROR org.apache.flink.runtime.rest.handler.taskmanager.TaskManagerLogFileHandler - Failed to transfer file from TaskExecutor container_e27_1611023118114_242361_01_000005.
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#702082692]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:647)
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:772)
at akka.dispatch.OnComplete.internal(Future.scala:258)
at akka.dispatch.OnComplete.internal(Future.scala:256)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:186)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:183)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:83)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef$.$anonfun$apply$1(AskSupport.scala:604)
at akka.actor.Scheduler$$anon$4.run(Scheduler.scala:126)
at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:870)
at scala.concurrent.BatchingExecutor.execute(BatchingExecutor.scala:109)
at scala.concurrent.BatchingExecutor.execute$(BatchingExecutor.scala:103)
at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:868)
at akka.actor.LightArrayRevolverScheduler$TaskHolder.executeTask(LightArrayRevolverScheduler.scala:329)
at akka.actor.LightArrayRevolverScheduler$$anon$4.executeBucket$1(LightArrayRevolverScheduler.scala:280)
at akka.actor.LightArrayRevolverScheduler$$anon$4.nextTick(LightArrayRevolverScheduler.scala:284)
at akka.actor.LightArrayRevolverScheduler$$anon$4.run(LightArrayRevolverScheduler.scala:236)
at java.lang.Thread.run(Thread.java:748)
Caused by: akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://flink/user/resourcemanager#702082692]] after [10000 ms]. Sender[null] sent message of type "org.apache.flink.runtime.rpc.messages.LocalFencedMessage".
重新配置如下参数
-yjm 2048 修改为 4096
-yD akka.ask.timeout=30 s
-yD web.timeout=10*60*1000
网友评论