美文网首页Flink实战
Flink实战之入库任务调优

Flink实战之入库任务调优

作者: 〇白衣卿相〇 | 来源:发表于2020-12-26 00:58 被阅读0次

    背景

    在调试flink写hdfs和hive时,任务总是报各种各样的异常,其中255问题最多,异常信息如下:

    java.lang.Exception: Exception from container-launch.
    Container id: container_1597847003686_5818_01_000002
    Exit code: 255
    Stack trace: ExitCodeException exitCode=255: 
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:604)
        at org.apache.hadoop.util.Shell.run(Shell.java:507)
        at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:789)
        at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:213)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
        at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
    
    
    Container exited with a non-zero exit code 255
    
        at org.apache.flink.yarn.YarnResourceManager.lambda$onContainersCompleted$0(YarnResourceManager.java:385)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    

    这段异常是yarn报出来的,根本原因是Direct Memory OOM了。那么该如何调优呢,容我慢慢道来。
    我们先看下Flink的内存模型。

    Flink内存模型

    JVM Heap内存

    堆内存包括:

    1. Framework Heap内存:flink框架使用的堆内存
    2. Task Heap内存:任务使用堆内存(java对象,基于内存的backend存储的state对象)

    配置参数:

    taskmanager.memory.framework.heap.size

    taskmanager.memory.task.heap.size

    JVM Off-Heap内存

    对外内存:

    1. Framework Off-Heap内存:flink框架使用的对外内存
    2. Task Off-Heap内存:任务使用的对外内存

    配置参数:

    taskmanager.memory.framework.off-heap.size

    taskmanager.memory.task.off-heap.size

    Framework vs Task

    区分:是否计入Slot资源

    Framework:flink框架运行使用的内存

    Task:任务运行使用的内存,包括heap、off-heap、managed、direct

    Heap vs Off-Heap

    区分:jvm堆内存和对外内存

    Heap:jvm堆

    Off-Heap:包括Direct、Native

    Framework Heap+Task Heap = -Xmx

    Framework off-heap +task off-heap + network = -XX:MaxDirectMemorySize

    Network Memory(网络buffer)

    属于Directory Memory

    用途:

    用于task之间缓冲数据,input buffer pool / output buffer pool

    配置参数:

    taskmanager.memory.network.min
    taskmanager.memory.network.max
    taskmanager.memory.network.fraction

    Managed Memory(托管内存)

    属于Native Memory

    用途:

    1. streaming任务RocksDB Backend
    2. batch任务的sort、hash table、中间结果缓存
    3. python任务的UDF使用

    配置参数:

    设置大小:taskmanager.memory.managed.size

    设置比率:taskmanager.memory.managed.fraction

    JVM Metaspace & Overhead

    都是jvm本身的开销

    JVM Metaspace

    用途:存放JVM加载的类的元数据,加载的类越多需要空间越大

    所以如果任务需要加载大量第三方库时,可以调大Metaspace内存

    配置参数:

    taskmanager.memory.jvm-metaspace.size

    JVM Overhead

    属于Native Memory

    用途:用于其他JVM开销,比如Code Cache、Thread Stack、garbage collection space 等。

    配置参数:

    taskmanager.memory.jvm-overhead.min
    taskmanager.memory.jvm-overhead.max
    taskmanager.memory.jvm-overhead.fraction

    看完上面的总结,想必大家已经有了大概了解,回到我们的入库任务,理解入库任务主要会使用哪一块的内存,那么如何调优也就一目了然了。

    入库任务使用内存

    入库任务底层原理都是基于StreamingFileSink写Hdfs文件。借助BulkWriter进行写入,数据是先写到Direct Memory当中,然后在文件滚动时flush到hdfs。所以主要使用的Direct Memory,其属于task off-heap内存。
    同时我们任务使用了RocksDB的状态后端,但是状态不是很大,也就1M左右。所以可以适当减少Managed Memory的大小。最终效果是调大了task off-heap的内存,调小了Managed Memory的内存,然后任务就不再报255了。
    配置参数taskmanager.memory.task.off-heap.size和taskmanager.memory.managed.fraction,具体配置多大,需要根据你的数据量、单条数据大小、Checkpoint间隔时长来计算出大概会在Direct Memory中存多少数据。

    后续

    最近发现有两个任务也报255,但是并不是Direct Memory超用,而是堆内存超用,所以调大了TM的内存。
    使用堆内存主要是Bucket对象,如果分区时间选择不合理,会导致分区很多,分配了大量Bucket,导致堆内存OOM。

    相关文章

      网友评论

        本文标题:Flink实战之入库任务调优

        本文链接:https://www.haomeiwen.com/subject/aiswnktx.html