美文网首页spark
spark提交流程-源码分析

spark提交流程-源码分析

作者: 专职掏大粪 | 来源:发表于2021-09-18 10:48 被阅读0次

    org.apache.spark.deploy.SparkSubmit
    -main

      -submit.doSubmit(args)
         - SparkSubmitArguments.parseArguments(args) 
           - - SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
            -  submit(args: SparkSubmitArguments, uninitLog: Boolean)
                -runMain(args: SparkSubmitArguments, uninitLog: Boolean)
                - new JavaMainApplication(mainClass)
                  -app.start(childArgs.toArray, sparkConf)
    
    // Following constants are visible for testing.
    private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
      "org.apache.spark.deploy.yarn.YarnClusterApplication"
    
      // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
      if (isYarnCluster) {
        childMainClass = YARN_CLUSTER_SUBMIT_CLASS
    
       //mainClass 是SparkApplication类型,构建SparkApplication
       val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
          mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
        } else {
          new JavaMainApplication(mainClass)
        }
    

    -YarnClusterApplication.start
    //ClientArguments传递这spark submit的提交单数
    - new Client(new ClientArguments(args), conf, null).run()
    -Client.submitApplication()
    private val yarnClient = YarnClient.createYarnClient
    //用于与rm通信
    protected ApplicationClientProtocol rmClient;

    //获取appid
    protected ApplicationClientProtocol rmClient;
    //穿件容器上下文环境
    val containerContext = createContainerLaunchContext(newAppResponse)
    containerContext 上下文包含 运行ApplicationMaster的的classname 等启动参数

     val amClass =
          if (isClusterMode) {
            Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
          } else {
            Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
          }
    

    //创建提交app的上下文环境
    val appContext = createApplicationSubmissionContext(newApp, containerContext)

    -yarnClient.submitApplication(appContext)
    -rmClient.submitApplication(request);

    搜索”org.apache.spark.deploy.yarn.ApplicationMaster“
    -org.apache.spark.deploy.yarn.ApplicationMaster.main
    -master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
    ApplicationMaster成员
    private val client = new YarnRMClient()
    //用于am和rm通信
    -amClient = AMRMClient.createAMRMClient()
    -master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
    -master.run()
    如果是集群模式,运行Driver

     if (isClusterMode) {
            runDriver()
          } else {
            runExecutorLauncher()
          }
    
     private def runDriver(): Unit = {
       addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
       userClassThread = startUserApplication()
      ... ...
       try {
        //等待startUserApplication的中driver线程将用户代码sparkContext的创建完成,否则一直阻塞在这里
         val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
           Duration(totalWaitTime, TimeUnit.MILLISECONDS))
         //sc不为空
         if (sc != null) {
           val rpcEnv = sc.env.rpcEnv
    
           val userConf = sc.getConf
           val host = userConf.get(DRIVER_HOST_ADDRESS)
           val port = userConf.get(DRIVER_PORT)
          //注册AM,申请西苑
           registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
    
           val driverRef = rpcEnv.setupEndpointRef(
             RpcAddress(host, port),
             YarnSchedulerBackend.ENDPOINT_NAME)
         //创建分配器,返回资源可用列表
           createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
         } else {
           // Sanity check; should never happen in normal operation, since sc should only be null
           // if the user app did not create a SparkContext.
           throw new IllegalStateException("User did not initialize spark context!")
         }
         //当资源准备就绪,调用resumeDriver方法,改变状态,让driver线程继续执行(用户代码逻辑)
         resumeDriver()
        userClassThread执行完之后,rundriver方法再继续执行
         userClassThread.join()
       } catch {
        ... ...
       } finally {
         resumeDriver()
       }
     }
    
    

    userClassThread = startUserApplication()

     private def startUserApplication(): Thread = {
        logInfo("Starting the user application in a separate Thread")
    
     ... ...
    
        val mainMethod = userClassLoader.loadClass(args.userClass)
          .getMethod("main", classOf[Array[String]])
    
        val userThread = new Thread {
          override def run(): Unit = {
            try {
              if (!Modifier.isStatic(mainMethod.getModifiers)) {
                logError(s"Could not find static main method in object ${args.userClass}")
                finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
              } else {
                //静态方法,也就是用户的用户编写Job Object 的main方法,调用用户代码
                mainMethod.invoke(null, userArgs.toArray)
                finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
                logDebug("Done running user class")
              }
            } catch {
              ......
            } ... ...
          }
        }
        userThread.setContextClassLoader(userClassLoader)
       //设置drive线程
        userThread.setName("Driver")
    //设置drive线程启动
        userThread.start()
        userThread
      }
    
    

    创建SparkContext
    val sc = new SparkContext(sparConf)
    SparkContext进行sc初始化的时候,有一段代码如下

    // Post init
       _taskScheduler.postStartHook()
    
    private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
    ...
      override def postStartHook(): Unit = {
    //sparkContextInitialized干两个事儿
     //SparkContext 初始化后,唤醒runDriver 方法继续执行
      //暂停driver线程(user thread),为了让在runDriver函数中进行初始化
        ApplicationMaster.sparkContextInitialized(sc)
    
        super.postStartHook()
        logInfo("YarnClusterScheduler.postStartHook done")
      }
    
    }
    
    private def sparkContextInitialized(sc: SparkContext) = {
       sparkContextPromise.synchronized {
         //SparkContext 初始化后,唤醒runDriver 方法继续执行
         // Notify runDriver function that SparkContext is available
         sparkContextPromise.success(sc)
         //暂停driver线程(user thread),为了让在runDriver函数中进行初始化
         // Pause the user class thread in order to make proper initialization in runDriver function.
         sparkContextPromise.wait()
       }
     }
    

    -TaskSchedulerImpl.postStartHook

    TaskSchedulerImpl.waitBackendReady
    private def waitBackendReady(): Unit = {
        if (backend.isReady) {
          return
        }
       //循环等待知道资源就绪,此时用户代码不会往下执行
       //那什么时候driver线程会继续执行?
    //当rundrive方法调用resumeDriver,改变backend 状态,代表资源就绪
        while (!backend.isReady) {
          // Might take a while for backend to be ready if it is waiting on resources.
          if (sc.stopped.get) {
            // For example: the master removes the application for some reason
            throw new IllegalStateException("Spark context stopped while waiting for backend")
          }
          synchronized {
            this.wait(100)
          }
        }
      }
    

    runDriver方法的registerAM
    private val client = new YarnRMClient()
    client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)

    amClient = AMRMClient.createAMRMClient()
    通过AMRMClient向RM注册
    amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)

      private def createAllocator(
          driverRef: RpcEndpointRef,
          _sparkConf: SparkConf,
          rpcEnv: RpcEnv,
          appAttemptId: ApplicationAttemptId,
          distCacheConf: SparkConf): Unit = {
        ... ...
    
        val appId = appAttemptId.getApplicationId().toString()
        val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
          CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
        val localResources = prepareLocalResources(distCacheConf)
          ... ...
        //创建分配器
        allocator = client.createAllocator(
          yarnConf,
          _sparkConf,
          appAttemptId,
          driverUrl,
          driverRef,
          securityMgr,
          localResources)
    
       ...  ...
    //获取可用资源列表
        allocator.allocateResources()
        val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER,
          sparkConf, securityMgr)
        val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
        ms.registerSource(new ApplicationMasterSource(prefix, allocator))
        // do not register static sources in this case as per SPARK-25277
        ms.start(false)
        metricsSystem = Some(ms)
        reporterThread = launchReporterThread()
      }
    

    //处理可用于分配的容器
    handleAllocatedContainers(allocatedContainers.asScala.toSeq)

     def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
        val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
    
        // Match incoming requests by host
        val remainingAfterHostMatches = new ArrayBuffer[Container]
       //可分配容器 分类整理按主机名和机架(首选位置的应用)
        for (allocatedContainer <- allocatedContainers) {
          matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
            containersToUse, remainingAfterHostMatches)
        }
    ... ...
        val remainingAfterRackMatches = new ArrayBuffer[Container]
        if (remainingAfterHostMatches.nonEmpty) {
          var exception: Option[Throwable] = None
          val thread = new Thread("spark-rack-resolver") {
            override def run(): Unit = {
              try {
                for (allocatedContainer <- remainingAfterHostMatches) {
                  val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
                  matchContainerToRequest(allocatedContainer, rack, containersToUse,
                    remainingAfterRackMatches)
                }
              } catch {
                case e: Throwable =>
             ... ...
        }
        //运行已分配容器进行
        runAllocatedContainers(containersToUse)
    
        logInfo("Received %d containers from YARN, launching executors on %d of them."
          .format(allocatedContainers.size, containersToUse.size))
      }
    
    

    runAllocatedContainers 启动容器

     private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = synchronized {
        for (container <- containersToUse) {
          val rpId = getResourceProfileIdFromPriority(container.getPriority)
          executorIdCounter += 1
          val executorHostname = container.getNodeId.getHost
          val containerId = container.getId
          val executorId = executorIdCounter.toString
          val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
          assert(container.getResource.getMemory >= yarnResourceForRpId.getMemory)
          logInfo(s"Launching container $containerId on host $executorHostname " +
            s"for executor with ID $executorId for ResourceProfile Id $rpId")
          ... ...
              TargetNum容器大于Running的容器数,说明还需要启动容器
          if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
            getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()
            if (launchContainers) {
    启动线程池,启动容器
              launcherPool.execute(() => {
                try {
                  new ExecutorRunnable(
                    Some(container),
                    conf,
                    sparkConf,
                    driverUrl,
                    executorId,
                    executorHostname,
                    containerMem,
                    containerCores,
                    appAttemptId.getApplicationId.toString,
                    securityMgr,
                    localResources,
                    rp.id
                  ).run()
                  updateInternalState()
                } catch {
                  ... ...
                }
              })
            } else {
              // For test only
              updateInternalState()
            }
          ... ...
        }
      }
    
    

    ExecutorRunnable.run

    //
    nmClient = NMClient.createNMClient()
        nmClient.init(conf)
        nmClient.start()
       //通过nmClient 通知指定nm启动contanter
        startContainer()
    
    

    startContainer

    def startContainer(): java.util.Map[String, ByteBuffer] = {
       ... ...
       //prepareCommand 是准备启动容器进程的脚本
      //启动org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 进程(excutor 的通信后台)
        val commands = prepareCommand()
    
        ctx.setCommands(commands.asJava)
        ctx.setApplicationACLs(
          YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)
    
         ...  ....
    
        // Send the start request to the ContainerManager
        try {
          //启动container,携带启动容器的上下文ctx
          nmClient.startContainer(container.get, ctx)
        } catch {
          ... ...
        }
      }
    

    org.apache.spark.executor.YarnCoarseGrainedExecutorBackend

    -CoarseGrainedExecutorBackend.run

    def run(
         arguments: Arguments,
         backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
           CoarseGrainedExecutorBackend): Unit = {
    
       ... ...
    //可以找到driver 与driver通信
         val fetcher = RpcEnv.create(
           "driverPropsFetcher",
           arguments.bindAddress,
           arguments.hostname,
           -1,
           executorConf,
           new SecurityManager(executorConf),
           numUsableCores = 0,
           clientMode = true)
    
         var driver: RpcEndpointRef = null
         val nTries = 3
         for (i <- 0 until nTries if driver == null) {
           try {
             driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
           } catch {
             case e: Throwable => if (i == nTries - 1) {
               throw e
             }
           }
         }
         //创建Executor的evn环境
         val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
           arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
        //设置rpcEnv环境的通信终端
    backendCreateFn 其实就是YarnCoarseGrainedExecutorBackend
         env.rpcEnv.setupEndpoint("Executor",
           backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
         arguments.workerUrl.foreach { url =>
           env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
         }
         env.rpcEnv.awaitTermination()
       }
     }
    
    
    //消息循环器
    var messageLoop: MessageLoop = null
          try {
            messageLoop = endpoint match {
              case e: IsolatedRpcEndpoint =>
                 //匹配消息循环器类型
                new DedicatedMessageLoop(name, e, this)
              case _ =>
                sharedLoop.register(name, endpoint)
                sharedLoop
            }
            endpoints.put(name, messageLoop)
          } catch {
            case NonFatal(e) =>
              endpointRefs.remove(endpoint)
              throw e
          }
    

    -DedicatedMessageLoop
    //收件箱
    private val inbox = new Inbox(name, endpoint)

    -Inbox
    protected val messages = new java.util.LinkedListInboxMessage
    // OnStart should be the first message to process
    //放入一个OnStart消息
    inbox.synchronized {
    messages.add(OnStart)
    }

    //RpcEndpoint的生命周期
     * {@code constructor -> onStart -> receive* -> onStop}
    
    private[spark] trait RpcEndpoint {
    

    CoarseGrainedExecutorBackend.onStart

    override def onStart(): Unit = {
      ... ...
    
        logInfo("Connecting to driver: " + driverUrl)
        try {
          _resources = parseOrFindResources(resourcesFileOpt)
        } catch {
          case NonFatal(e) =>
            exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
        }
        rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
          // This is a very fast action so we can use "ThreadUtils.sameThread"
          //获取driver
          driver = Some(ref)
         //像driver发送RegisterExecutor消息
          ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
            extractAttributes, _resources, resourceProfile.id))
        }(ThreadUtils.sameThread).onComplete {
          case Success(_) =>
            self.send(RegisteredExecutor)
          case Failure(e) =>
            exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
        }(ThreadUtils.sameThread)
      }
    

    driver是一个线程,所有是SparkContext接收消息
    _schedulerBackend是driver通信后台
    private var _schedulerBackend: SchedulerBackend = _
    //集群模式的SchedulerBackend
    CoarseGrainedSchedulerBackend
    //消息回复
    CoarseGrainedSchedulerBackend.receiveAndReply

    匹配RegisterExecutor消息
         case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
              attributes, resources, resourceProfileId) =>
    总的核数 注册数量增加
        totalCoreCount.addAndGet(cores)
              totalRegisteredExecutors.addAndGet(1)
    //最后回复一个true表示注册成功
       context.reply(true)
    

    CoarseGrainedExecutorBackend.收到消息

    case Success(_) =>
       //给自己发送一条消息,表示注册完毕
           self.send(RegisteredExecutor)
    

    CoarseGrainedExecutorBackend.收到给自己发送的RegisteredExecutor消息

    override def receive: PartialFunction[Any, Unit] = {
      case RegisteredExecutor =>
        logInfo("Successfully registered with driver")
        try {
        //创建Executor计算对象,区别CoarseGrainedExecutorBackend(通信对象)
          executor = new (executorId, hostname, env, userClassPath, isLocal = false,
            resources = _resources)
       //给driver发送LaunchedExecutor 消息
          driver.get.send(LaunchedExecutor(executorId))
    

    driver端的CoarseGrainedSchedulerBackend receive到
    LaunchedExecutor消息

      case LaunchedExecutor(executorId) =>
          //增加核数
            executorDataMap.get(executorId).foreach { data =>
              data.freeCores = data.totalCores
            }
       //做一些操作 //tode
            makeOffers(executorId)
          case e =>
            logError(s"Received unexpected message. ${e}")
    

    相关文章

      网友评论

        本文标题:spark提交流程-源码分析

        本文链接:https://www.haomeiwen.com/subject/jevjgltx.html