美文网首页一步一步学习SparkSpark深入学习
Sparn On Yarn启动流程源码分析

Sparn On Yarn启动流程源码分析

作者: 分裂四人组 | 来源:发表于2017-05-06 13:39 被阅读167次

    YARN模式下启动流程

    1.YarnschedulerBackend启动入口

    YARN的启动是在SparkContext初始化scheduler时启动的,通过ClassLoader初始化YarnschedulerBackend和YARTaskscheduler。

        //scheduler的初始化, 调用createTaskScheduler()方法
        // Create and start the scheduler
        val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts
        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
        
        // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
        // constructor
        _taskScheduler.start()
        
        /**
       * Create a task scheduler based on a given master URL.
       * Return a 2-tuple of the scheduler backend and the task scheduler.
       */
       // 该方法根据master字符串进行匹配,如果是local/standalone模式,匹配响应的schedulerBackend和taskscheduler,
       // 如果是yarn,则走默认形式
      private def createTaskScheduler(
          sc: SparkContext,
          master: String,
          deployMode: String): (SchedulerBackend, TaskScheduler) = {
        import SparkMasterRegex._
    
        // When running locally, don't try to re-execute tasks on failure.
        val MAX_LOCAL_TASK_FAILURES = 1
    
        master match {
          case "local" =>
            val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
            val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
            scheduler.initialize(backend)
            (backend, scheduler)
    
          case LOCAL_N_REGEX(threads) =>
           ...
          case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
            ...
          case SPARK_REGEX(sparkUrl) =>
            ...
          case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
           ...
          case masterUrl =>
             // 这个方法如何实现基于classLoader调用YarnClusterManager.class的(scala语法不熟,待考证)
            val cm = getClusterManager(masterUrl) match {
              case Some(clusterMgr) => clusterMgr
              case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
            }
            try {
              val scheduler = cm.createTaskScheduler(sc, masterUrl)
              val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
              cm.initialize(scheduler, backend)
              (backend, scheduler)
            } catch {
              case se: SparkException => throw se
              case NonFatal(e) =>
                throw new SparkException("External scheduler cannot be instantiated", e)
            }
        }
      }
      
      //getClusterManager()通过类加载,加载ExternalClusterManager类,同时过滤出可以构造出yarn类型的schedulerBackend和taskscheduler
       private def getClusterManager(url: String): Option[ExternalClusterManager] = {
        val loader = Utils.getContextOrSparkClassLoader
        val serviceLoaders =
          ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
        if (serviceLoaders.size > 1) {
          throw new SparkException(
            s"Multiple external cluster managers registered for the url $url: $serviceLoaders")
        }
        serviceLoaders.headOption
      }
      
      
      // createTaskScheduler()函数真正返回的schedulerBackend和taskscheduler是通过下面这个class
      private[spark] class YarnClusterManager extends ExternalClusterManager{
      }
    

    创建ApplicationMaster

    SparkContext初始化过程中,会向YARN集群初始化Application(Master),流程如下:

     /**
       * Submit an application running our ApplicationMaster to the ResourceManager.
       *
       * The stable Yarn API provides a convenience method (YarnClient#createApplication) for
       * creating applications and setting up the application submission context. This was not
       * available in the alpha API.
       */
      def submitApplication(user: Option[String] = None): ApplicationId = {
        var appId: ApplicationId = null
        try {
          launcherBackend.connect()
          // Setup the credentials before doing anything else,
          // so we have don't have issues at any point.
          setupCredentials(user)
          yarnClient.init(yarnConf)
          yarnClient.start()
          sparkUser = user
    
          logInfo(s"[DEVELOP] [sparkUser:${sparkUser}] Requesting a new application " +
            s"from cluster with %d NodeManagers"
            .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))
    
          // Get a new application from our RM
          val newApp = yarnClient.createApplication()
          val newAppResponse = newApp.getNewApplicationResponse()
          appId = newAppResponse.getApplicationId()
          reportLauncherState(SparkAppHandle.State.SUBMITTED)
          launcherBackend.setAppId(appId.toString)
    
          new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext()
    
          // Verify whether the cluster has enough resources for our AM
          verifyClusterResources(newAppResponse)
    
          // Set up the appropriate contexts to launch our AM
          
          // 关键是这两个方法:
          // 1. 创建ApplicationMaster ContainerLaunch上下文,将ContainerLaunch命令、jar包、java变量等环境准备完毕;
          // 2. 创建Application提交至YARN的上下文,主要读取配置文件设置调用YARN接口前的上下文变量。
          
          val containerContext = createContainerLaunchContext(newAppResponse)
          val appContext = createApplicationSubmissionContext(newApp, containerContext)
    
          // Finally, submit and monitor the application
          logInfo(s"Submitting application $appId to ResourceManager")
          yarnClient.submitApplication(appContext)
          appId
        } catch {
          case e: Throwable =>
            if (appId != null) {
              cleanupStagingDir(appId)
            }
            throw e
        }
      }
    

    真正Application启动是调用如下方法:

        val amClass =
          if (isClusterMode) {
            Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
          } else {
            Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
          }
    

    启动ApplicationMaster

    基于YARN-client的模式启动,所以直接跳转至org.apache.spark.deploy.yarn.ExecutorLauncher, 该类也是封装在ApplicationMaseter中,顺着main()函数往下走,调用ApplicationMaster.run()函数-> runExecutorLauncher(securityMgr)

      private def runExecutorLauncher(securityMgr: SecurityManager): Unit = {
        val port = sparkConf.getInt("spark.yarn.am.port", 0)
    
        // 创建RPCEndpoint同driver交互
        rpcEnv = RpcEnv.create("sparkYarnAM", Utils.localHostName, port, sparkConf, securityMgr,
          clientMode = true)
        val driverRef = waitForSparkDriver()
        // WHY?
        addAmIpFilter()
        
        // 关键函数,向Driver注册AM
        registerAM(sparkConf, rpcEnv, driverRef, sparkConf.get("spark.driver.appUIAddress", ""),
          securityMgr)
    
        // In client mode the actor will stop the reporter thread.
        reporterThread.join()
      }
      
      
      
       private def registerAM(
          _sparkConf: SparkConf,
          _rpcEnv: RpcEnv,
          driverRef: RpcEndpointRef,
          uiAddress: String,
          securityMgr: SecurityManager) = {
        val appId = client.getAttemptId().getApplicationId().toString()
        val attemptId = client.getAttemptId().getAttemptId().toString()
        val historyAddress =
          _sparkConf.get(HISTORY_SERVER_ADDRESS)
            .map { text => SparkHadoopUtil.get.substituteHadoopVariables(text, yarnConf) }
            .map { address => s"${address}${HistoryServer.UI_PATH_PREFIX}/${appId}/${attemptId}" }
            .getOrElse("")
    
        val driverUrl = RpcEndpointAddress(
          _sparkConf.get("spark.driver.host"),
          _sparkConf.get("spark.driver.port").toInt,
          CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    
        // Before we initialize the allocator, let's log the information about how executors will
        // be run up front, to avoid printing this out for every single executor being launched.
        // Use placeholders for information that changes such as executor IDs.
        logInfo {
          val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
          val executorCores = sparkConf.get(EXECUTOR_CORES)
    
          //  申请Executor资源(debug log)
          val dummyRunner = new ExecutorRunnable(None, yarnConf, sparkConf, driverUrl, "<executorId>",
            "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
          dummyRunner.launchContextDebugInfo()
        }
    
        //向RM注册driver地址
        allocator = client.register(driverUrl,
          driverRef,
          yarnConf,
          _sparkConf,
          uiAddress,
          historyAddress,
          securityMgr,
          localResources)
    
        //申请Executor资源
        allocator.allocateResources()
        reporterThread = launchReporterThread()
      }
    

    调用yarn RM接口完成资源申请,同时初始化ApplicationMaster容器:

     /**
       * Request resources such that, if YARN gives us all we ask for, we'll have a number of containers
       * equal to maxExecutors.
       *
       * Deal with any containers YARN has granted to us by possibly launching executors in them.
       *
       * This must be synchronized because variables read in this method are mutated by other methods.
       */
      def allocateResources(): Unit = synchronized {
        updateResourceRequests()
    
        val progressIndicator = 0.1f
        // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
        // requests.
        // 调用YARN接口,分配container
        val allocateResponse = amClient.allocate(progressIndicator)
        
         // 获取分配container资源状态
        val allocatedContainers = allocateResponse.getAllocatedContainers()
    
        if (allocatedContainers.size > 0) {
          logInfo("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
            .format(
              allocatedContainers.size,
              numExecutorsRunning,
              allocateResponse.getAvailableResources))
            
            // 当申请完毕资源后,处理函数:会初始化该executor环境,等待分配task       
           handleAllocatedContainers(allocatedContainers.asScala)
        }
    
        val completedContainers = allocateResponse.getCompletedContainersStatuses()
        if (completedContainers.size > 0) {
          logInfo("Completed %d containers".format(completedContainers.size))
          processCompletedContainers(completedContainers.asScala)
          logInfo("Finished processing %d completed containers. Current running executor count: %d."
            .format(completedContainers.size, numExecutorsRunning))
        }
      }
    

    继续往下走,当想RM申请完资源后,会调用ExecutorLaunch初始化Executor环境,具体如下:

    /**
       * Handle containers granted by the RM by launching executors on them.
       *
       * Due to the way the YARN allocation protocol works, certain healthy race conditions can result
       * in YARN granting containers that we no longer need. In this case, we release them.
       *
       * Visible for testing.
       */
      def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
        val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
    
        // Match incoming requests by host
        val remainingAfterHostMatches = new ArrayBuffer[Container]
        for (allocatedContainer <- allocatedContainers) {
          matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
            containersToUse, remainingAfterHostMatches)
        }
    
        // Match remaining by rack
        val remainingAfterRackMatches = new ArrayBuffer[Container]
        for (allocatedContainer <- remainingAfterHostMatches) {
          val rack = RackResolver.resolve(conf, allocatedContainer.getNodeId.getHost).getNetworkLocation
          matchContainerToRequest(allocatedContainer, rack, containersToUse,
            remainingAfterRackMatches)
        }
    
        // Assign remaining that are neither node-local nor rack-local
        val remainingAfterOffRackMatches = new ArrayBuffer[Container]
        for (allocatedContainer <- remainingAfterRackMatches) {
          matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
            remainingAfterOffRackMatches)
        }
    
        if (!remainingAfterOffRackMatches.isEmpty) {
          logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
            s"allocated to us")
          for (container <- remainingAfterOffRackMatches) {
            internalReleaseContainer(container)
          }
        }
         
         // 以上执行为剔除不可用的container之后最终执行可以使用的Container
        runAllocatedContainers(containersToUse)
    
        logInfo("Received %d containers from YARN, launching executors on %d of them."
          .format(allocatedContainers.size, containersToUse.size))
      }
      
      
      /**
       * Launches executors in the allocated containers.
       */
      private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
        for (container <- containersToUse) {
          executorIdCounter += 1
          val executorHostname = container.getNodeId.getHost
          val containerId = container.getId
          val executorId = executorIdCounter.toString
          
          assert(container.getResource.getMemory >= resource.getMemory)
          logInfo(s"Launching container $containerId on host $executorHostname")
    
          def updateInternalState(): Unit = synchronized {
            numExecutorsRunning += 1
            executorIdToContainer(executorId) = container
            containerIdToExecutorId(container.getId) = executorId
    
            val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
              new HashSet[ContainerId])
            containerSet += containerId
            allocatedContainerToHostMap.put(containerId, executorHostname)
          }
    
          if (numExecutorsRunning < targetNumExecutors) {
            if (launchContainers) {
                // 将创建exector任务提交至线程池
              launcherPool.execute(new Runnable {
              
               // 真正完成executer初始化的是ExecutorRunnable()类
                override def run(): Unit = {
                  try {
                    new ExecutorRunnable(
                      Some(container),
                      conf,
                      sparkConf,
                      driverUrl,
                      executorId,
                      executorHostname,
                      executorMemory,
                      executorCores,
                      appAttemptId.getApplicationId.toString,
                      securityMgr,
                      localResources
                    ).run()
                    updateInternalState()
                  } catch {
                    case NonFatal(e) =>
                      logError(s"Failed to launch executor $executorId on container $containerId", e)
                      // Assigned container should be released immediately to avoid unnecessary resource
                      // occupation.
                      amClient.releaseAssignedContainer(containerId)
                  }
                }
              })
            } else {
              // For test only
              updateInternalState()
            }
          } else {
            logInfo(("Skip launching executorRunnable as runnning Excecutors count: %d " +
              "reached target Executors count: %d.").format(numExecutorsRunning, targetNumExecutors))
          }
        }
      }
    

    Executor的启动

    在ExecutorRunnable.run()方法中,会启动executor的执行命令,具体如下:

    private def prepareCommand(): List[String] = {
        // Extra options for the JVM
        val javaOpts = ListBuffer[String]()
    
        // java/spark  运行时环境变量
        ....
        
        YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
        
        // executor真正的启动命令,真正调用的是`org.apache.spark.executor.CoarseGrainedExecutorBackend`
        
        val commands = prefixEnv ++ Seq(
          YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
          "-server") ++
          javaOpts ++
          Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
            "--driver-url", masterAddress,
            "--executor-id", executorId,
            "--hostname", hostname,
            "--cores", executorCores.toString,
            "--app-id", appId) ++
          userClassPath ++
          Seq(
            s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
            s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
    
        // TODO: it would be nicer to just make sure there are no null commands here
        commands.map(s => if (s == null) "null" else s).toList
      }
    

    org.apache.spark.executor.CoarseGrainedExecutorBackend的实现逻辑比较简单,在run()函数中创建了一个RPCEndPoint,等待LaunchTask(data)消息接受,接受之后,调用exector.launchTask()执行任务,执行任务的流程则是将task加入runningTasks,并调用threadPool进行execute。

    运行结果

    YARN集群的日志由于分散在多台机器上,比较分散,所以想通过日志来跟踪启动流程比较困难,但是如果集群小的话,通过这个方式来验证整个流程还是挺不错的方式。

    ApplicationMaster的执行日志,可以看到最终调用的org.apache.spark.executor.CoarseGrainedExecutorBackend 来启动executor。

    17/05/05 16:54:58 INFO ApplicationMaster: Preparing Local resources
    17/05/05 16:54:59 WARN DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
    17/05/05 16:54:59 WARN Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby
    17/05/05 16:54:59 INFO ApplicationMaster: ApplicationAttemptId: appattempt_1493803865684_0180_000002
    17/05/05 16:54:59 INFO SecurityManager: Changing view acls to: hzlishuming
    17/05/05 16:54:59 INFO SecurityManager: Changing modify acls to: hzlishuming
    17/05/05 16:54:59 INFO SecurityManager: Changing view acls groups to: 
    17/05/05 16:54:59 INFO SecurityManager: Changing modify acls groups to: 
    17/05/05 16:54:59 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(hzlishuming); groups with view permissions: Set(); users  with modify permissions: Set(hzlishuming); groups with modify permissions: Set()
    17/05/05 16:54:59 INFO AMCredentialRenewer: Scheduling login from keytab in 61745357 millis.
    17/05/05 16:54:59 INFO ApplicationMaster: Waiting for Spark driver to be reachable.
    17/05/05 16:54:59 INFO ApplicationMaster: Driver now available: xxxx:47065
    17/05/05 16:54:59 INFO TransportClientFactory: Successfully created connection to /xxxx:47065 after 110 ms (0 ms spent in bootstraps)
    17/05/05 16:54:59 INFO ApplicationMaster$AMEndpoint: Add WebUI Filter. AddWebUIFilter(org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,Map(PROXY_HOSTS -> ....)
    17/05/05 16:55:00 INFO ApplicationMaster: 
    ===============================================================================
    YARN executor launch context:
      env:
        CLASSPATH -> {{PWD}}<CPS>{{PWD}}/__spark_conf__<CPS>{{PWD}}/__spark_libs__/*<CPS>$HADOOP_CONF_DIR<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/*<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/lib/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*
        SPARK_YARN_STAGING_DIR -> hdfs://hz-test01/user/hzlishuming/.sparkStaging/application_1493803865684_0180
        SPARK_USER -> hzlishuming
        SPARK_YARN_MODE -> true
    
      command:
        {{JAVA_HOME}}/bin/java \ 
          -server \ 
          -Xmx4096m \ 
          '-XX:PermSize=1024m' \ 
          '-XX:MaxPermSize=1024m' \ 
          '-verbose:gc' \ 
          '-XX:+PrintGCDetails' \ 
          '-XX:+PrintGCDateStamps' \ 
          '-XX:+PrintTenuringDistribution' \ 
          -Djava.io.tmpdir={{PWD}}/tmp \ 
          '-Dspark.driver.port=47065' \ 
          -Dspark.yarn.app.container.log.dir=<LOG_DIR> \ 
          -XX:OnOutOfMemoryError='kill %p' \ 
          org.apache.spark.executor.CoarseGrainedExecutorBackend \ 
          --driver-url \ 
          spark://CoarseGrainedScheduler@....:47065 \ 
          --executor-id \ 
          <executorId> \ 
          --hostname \ 
          <hostname> \ 
          --cores 
    

    在Driver端,注册完executor之后留下日志如下:

     433 17/05/05 16:04:59 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) () with ID 1
     434 17/05/05 16:04:59 INFO YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) () with ID 2
     435 17/05/05 16:04:59 INFO BlockManagerMasterEndpoint: Registering block manager xxxx with 2004.6 MB RAM, BlockManagerId(1, h, 54063, None)
     436 17/05/05 16:04:59 INFO BlockManagerMasterEndpoint: Registering block manager xxxx with 2004.6 MB RAM, BlockManagerId(2, xxx, 42904, None)
    

    executor的启动日志,可以通过SparkUI上查看,处理流程上面已经交代,执行的为 org.apache.spark.executor.CoarseGrainedExecutorBackend逻辑。

    17/05/05 16:55:15 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
    17/05/05 16:55:16 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@xxx.35:47065
    17/05/05 16:55:16 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
    17/05/05 16:55:16 INFO Executor: Starting executor ID 4 on host hadoop694.lt.163.org
    17/05/05 16:55:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40418.
    17/05/05 16:55:16 INFO NettyBlockTransferService: Server created on xxx:40418
    

    相关文章

      网友评论

      本文标题:Sparn On Yarn启动流程源码分析

      本文链接:https://www.haomeiwen.com/subject/wawetxtx.html