美文网首页
Spark on Yarn2.2.0资源分配

Spark on Yarn2.2.0资源分配

作者: 我猪妹打钱 | 来源:发表于2017-11-23 16:26 被阅读0次
    写在前面的

    文章修改自Spark1.3.0版本:

    参数

    本文主要讨论Spark on Yarn内存分配情况,所以只需要关注以下几个内心相关的参数(该值来自spark2.2.0官网或者spark-shell --help脚本):

    • spark.driver.memory:默认值512m
    • spark.executor.memory:默认值1024m
    • spark.yarn.am.memory:默认值512m
    • spark.yarn.executor.memoryOverhead:默认executorMemory * 0.10, 最小384m
    • spark.yarn.driver.memoryOverhead:默认driverMemory * 0.10, 最小384m
    • spark.yarn.am.memoryOverhead:默认AM memory * 0.10, 最小 384m

    另外,因为任务是提交到YARN上运行的,所以YARN中有几个关键参数:

    • yarn.app.mapreduce.am.resource.mb:AM能够申请的最大内存,默认值为1536MB
    • yarn.nodemanager.resource.memory-mb:nodemanager能够申请的最大内存,默认值为8192MB
    • yarn.scheduler.minimum-allocation-mb:调度时一个container能够申请的最小资源,默认值为1024MB
    • yarn.scheduler.maximum-allocation-mb:调度时一个container能够申请的最大资源,默认值为8192MB
      原文章做了修改设置,我测试并未设置,故使用默认值.

    提交脚本

    spark-submit \
    --master yarn \
    --class org.apache.spark.examples.SparkPi \
    --executor-memory 1g \
    --executor-cores 1 \
    /Users/***/***/spark/examples/jars/spark-examples.jar \
    3
    

    日志

    日志配置也未修改,部分日志如下:

    17/11/23 14:35:02 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
    17/11/23 14:35:02 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers
    17/11/23 14:35:02 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
    17/11/23 14:35:02 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
    17/11/23 14:35:02 INFO yarn.Client: Setting up container launch context for our AM
    17/11/23 14:35:02 INFO yarn.Client: Setting up the launch environment for our AM container
    17/11/23 14:35:02 INFO yarn.Client: Preparing resources for our AM container
    17/11/23 14:35:04 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
    17/11/23 14:35:08 INFO yarn.Client: Uploading resource file:/private/var/folders/wp/zf3snrvs4vdghkwl7ht5jzf00000gp/T/spark-19ca460e-f039-4a7f-acf8-19bb64d6a838/__spark_libs__6901922854754905952.zip -> hdfs://li-MacBook.local:8020/user/ayiya/.sparkStaging/application_1511418590330_0003/__spark_libs__6901922854754905952.zip
    17/11/23 14:35:10 INFO yarn.Client: Uploading resource file:/private/var/folders/wp/zf3snrvs4vdghkwl7ht5jzf00000gp/T/spark-19ca460e-f039-4a7f-acf8-19bb64d6a838/__spark_conf__5953037503894379138.zip -> hdfs://li-MacBook.local:8020/user/ayiya/.sparkStaging/application_1511418590330_0003/__spark_conf__.zip
    17/11/23 14:35:10 INFO spark.SecurityManager: Changing view acls to: ayiya
    17/11/23 14:35:10 INFO spark.SecurityManager: Changing modify acls to: ayiya
    17/11/23 14:35:10 INFO spark.SecurityManager: Changing view acls groups to: 
    17/11/23 14:35:10 INFO spark.SecurityManager: Changing modify acls groups to: 
    17/11/23 14:35:10 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(ayiya); groups with view permissions: Set(); users  with modify permissions: Set(ayiya); groups with modify permissions: Set()
    17/11/23 14:35:10 INFO yarn.Client: Submitting application application_1511418590330_0003 to ResourceManager
    17/11/23 14:35:10 INFO impl.YarnClientImpl: Submitted application application_1511418590330_0003
    17/11/23 14:35:10 INFO cluster.SchedulerExtensionServices: Starting Yarn extension services with app application_1511418590330_0003 and attemptId None
    17/11/23 14:35:11 INFO yarn.Client: Application report for application_1511418590330_0003 (state: ACCEPTED)
    17/11/23 14:35:11 INFO yarn.Client: 
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: N/A
         ApplicationMaster RPC port: -1
         queue: root.ayiya
         start time: 1511418910874
         final status: UNDEFINED
         tracking URL: http://li-MacBook.local:8088/proxy/application_1511418590330_0003/
         user: ayiya
    17/11/23 14:35:12 INFO yarn.Client: Application report for application_1511418590330_0003 (state: ACCEPTED)
    17/11/23 14:35:21 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(null)
    17/11/23 14:35:21 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> li-MacBook.local, PROXY_URI_BASES -> http://li-MacBook.local:8088/proxy/application_1511418590330_0003), /proxy/application_1511418590330_0003
    17/11/23 14:35:21 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
    17/11/23 14:35:22 INFO yarn.Client: Application report for application_1511418590330_0003 (state: RUNNING)
    17/11/23 14:35:22 INFO yarn.Client: 
         client token: N/A
         diagnostics: N/A
         ApplicationMaster host: 10.10.8.135
         ApplicationMaster RPC port: 0
         queue: root.ayiya
         start time: 1511418910874
         final status: UNDEFINED
         tracking URL: http://li-MacBook.local:8088/proxy/application_1511418590330_0003/
         user: ayiya
    17/11/23 14:35:22 INFO cluster.YarnClientSchedulerBackend: Application application_1511418590330_0003 has started running.
    17/11/23 14:35:22 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 61705.
    17/11/23 14:35:22 INFO netty.NettyBlockTransferService: Server created on 10.10.8.135:61705
    17/11/23 14:35:22 INFO storage.BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    17/11/23 14:35:22 INFO storage.BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 10.10.8.135, 61705, None)
    17/11/23 14:35:22 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.10.8.135:61705 with 366.3 MB RAM, BlockManagerId(driver, 10.10.8.135, 61705, None)
    17/11/23 14:35:22 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 10.10.8.135, 61705, None)
    17/11/23 14:35:22 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 10.10.8.135, 61705, None)
    17/11/23 14:35:22 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@7675c171{/metrics/json,null,AVAILABLE}
    17/11/23 14:35:22 INFO scheduler.EventLoggingListener: Logging events to hdfs://li-MacBook.local:8020/spark_log/application_1511418590330_0003
    17/11/23 14:35:28 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.10.8.135:61711) with ID 1
    17/11/23 14:35:29 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.10.8.135:61715 with 366.3 MB RAM, BlockManagerId(1, 10.10.8.135, 61715, None)
    17/11/23 14:35:30 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.10.8.135:61717) with ID 2
    17/11/23 14:35:30 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
    17/11/23 14:35:30 INFO internal.SharedState: Warehouse path is 'file:/Users/ayiya/app/spark-2.1.0-bin-hadoop2.7/examples/jars/spark-warehouse/'.
    17/11/23 14:35:30 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@5fef0c19{/SQL,null,AVAILABLE}
    17/11/23 14:35:30 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@22a6d75c{/SQL/json,null,AVAILABLE}
    17/11/23 14:35:30 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@34a20f16{/SQL/execution,null,AVAILABLE}
    17/11/23 14:35:30 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@1a1c21b4{/SQL/execution/json,null,AVAILABLE}
    17/11/23 14:35:30 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@3b0d3a63{/static/sql,null,AVAILABLE}
    17/11/23 14:35:30 INFO storage.BlockManagerMasterEndpoint: Registering block manager 10.10.8.135:61718 with 366.3 MB RAM, BlockManagerId(2, 10.10.8.135, 61718, None)
    17/11/23 14:35:31 INFO spark.SparkContext: Starting job: reduce at SparkPi.scala:38
    17/11/23 14:35:31 INFO scheduler.DAGScheduler: Got job 0 (reduce at SparkPi.scala:38) with 3 output partitions
    17/11/23 14:35:31 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:38)
    17/11/23 14:35:31 INFO scheduler.DAGScheduler: Parents of final stage: List()
    17/11/23 14:35:31 INFO scheduler.DAGScheduler: Missing parents: List()
    17/11/23 14:35:31 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34), which has no missing parents
    17/11/23 14:35:31 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1832.0 B, free 366.3 MB)
    17/11/23 14:35:31 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1172.0 B, free 366.3 MB)
    17/11/23 14:35:31 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.8.135:61705 (size: 1172.0 B, free: 366.3 MB)
    17/11/23 14:35:31 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:996
    17/11/23 14:35:31 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:34)
    17/11/23 14:35:31 INFO cluster.YarnScheduler: Adding task set 0.0 with 3 tasks
    17/11/23 14:35:31 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.10.8.135, executor 1, partition 0, PROCESS_LOCAL, 6034 bytes)
    17/11/23 14:35:31 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.10.8.135, executor 2, partition 1, PROCESS_LOCAL, 6034 bytes)
    17/11/23 14:35:32 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.8.135:61715 (size: 1172.0 B, free: 366.3 MB)
    17/11/23 14:35:32 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.10.8.135:61718 (size: 1172.0 B, free: 366.3 MB)
    17/11/23 14:35:33 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 10.10.8.135, executor 2, partition 2, PROCESS_LOCAL, 6034 bytes)
    17/11/23 14:35:33 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1559 ms on 10.10.8.135 (executor 1) (1/3)
    17/11/23 14:35:33 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 1481 ms on 10.10.8.135 (executor 2) (2/3)
    17/11/23 14:35:33 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 127 ms on 10.10.8.135 (executor 2) (3/3)
    17/11/23 14:35:33 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:38) finished in 1.678 s
    17/11/23 14:35:33 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    17/11/23 14:35:33 INFO scheduler.DAGScheduler: Job 0 finished: reduce at SparkPi.scala:38, took 2.182344 s
    Pi is roughly 3.1482771609238696
    17/11/23 14:35:33 INFO server.ServerConnector: Stopped ServerConnector@6bc248ed{HTTP/1.1}{0.0.0.0:4040}
    17/11/23 14:35:33 INFO ui.SparkUI: Stopped Spark web UI at http://10.10.8.135:4040
    17/11/23 14:35:33 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
    17/11/23 14:35:33 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
    17/11/23 14:35:33 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
    17/11/23 14:35:33 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices
    (serviceOption=None,
     services=List(),
     started=false)
    17/11/23 14:35:33 INFO cluster.YarnClientSchedulerBackend: Stopped
    17/11/23 14:35:33 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    17/11/23 14:35:33 INFO memory.MemoryStore: MemoryStore cleared
    17/11/23 14:35:33 INFO storage.BlockManager: BlockManager stopped
    17/11/23 14:35:33 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    17/11/23 14:35:33 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    17/11/23 14:35:33 INFO spark.SparkContext: Successfully stopped SparkContext
    17/11/23 14:35:33 INFO util.ShutdownHookManager: Shutdown hook called
    17/11/23 14:35:33 INFO util.ShutdownHookManager: Deleting directory /private/var/folders/wp/zf3snrvs4vdghkwl7ht5jzf00000gp/T/spark-19ca460e-f039-4a7f-acf8-19bb64d6a838
    

    分析

    我们关注这一句话:Will allocate AM container, with 896 MB memory including 384 MB overhead,可以知道AM实际内存为896-384=512M,即在该作业中,AM实际内存为默认值.
    这里为什么会取默认值呢?源码如下:

    private def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = {
        val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
        logInfo("Verifying our application has not requested more than the maximum " +
          s"memory capability of the cluster ($maxMem MB per container)")
        val executorMem = args.executorMemory + executorMemoryOverhead
        if (executorMem > maxMem) {
          throw new IllegalArgumentException(s"Required executor memory (${args.executorMemory}" +
            s"+$executorMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
        }
        val amMem = args.amMemory + amMemoryOverhead
        if (amMem > maxMem) {
          throw new IllegalArgumentException(s"Required AM memory (${args.amMemory}" +
            s"+$amMemoryOverhead MB) is above the max threshold ($maxMem MB) of this cluster!")
        }
        logInfo("Will allocate AM container, with %d MB memory including %d MB overhead".format(
          amMem,
          amMemoryOverhead))
      }
    
    private def validateArgs(): Unit = {
        if (numExecutors <= 0) {
          throw new IllegalArgumentException(
            "You must specify at least 1 executor!\n" + getUsageMessage())
        }
        if (executorCores < sparkConf.getInt("spark.task.cpus", 1)) {
          throw new SparkException("Executor cores must not be less than " +
            "spark.task.cpus.")
        }
        if (isClusterMode) {
          for (key <- Seq(amMemKey, amMemOverheadKey, amCoresKey)) {
            if (sparkConf.contains(key)) {
              println(s"$key is set but does not apply in cluster mode.")
            }
          }
          amMemory = driverMemory
          amCores = driverCores
        } else {
          for (key <- Seq(driverMemOverheadKey, driverCoresKey)) {
            if (sparkConf.contains(key)) {
              println(s"$key is set but does not apply in client mode.")
            }
          }
          sparkConf.getOption(amMemKey)
            .map(Utils.memoryStringToMb)
            .foreach { mem => amMemory = mem }
          sparkConf.getOption(amCoresKey)
            .map(_.toInt)
            .foreach { cores => amCores = cores }
        }
      }
    

    从上面代码可以看到当 isClusterMode 为true时,则args.amMemory值为driverMemory的值;否则,则从spark.yarn.am.memory中取,如果没有设置该属性,则取默认值512m。
    于是这句话中896 MB和384 MB 的来源为:
    384为amMemoryOverhead默认值.
    896为amMemoryOverhead默认值(384)+spark.yarn.am.memory默认值(512)
    若要自定义AM的内存,我们可以使用--conf spark.yarn.am.memory=??传入.

    YARN UI:

    YARN

    总结

    在client模式下,AM对应的Container内存由spark.yarn.am.memory加上spark.yarn.am.memoryOverhead来确定,executor加上spark.yarn.executor.memoryOverhead的值之后确定对应Container需要申请的内存大小,driver和executor的内存加上spark.yarn.driver.memoryOverhead或spark.yarn.executor.memoryOverhead的值之后再乘以0.54确定storage memory内存大小。在YARN中,Container申请的内存大小必须为yarn.scheduler.minimum-allocation-mb的整数倍。

    相关文章

      网友评论

          本文标题:Spark on Yarn2.2.0资源分配

          本文链接:https://www.haomeiwen.com/subject/vwyzvxtx.html