美文网首页
spark源码阅读之executor模块②

spark源码阅读之executor模块②

作者: invincine | 来源:发表于2019-01-25 17:54 被阅读0次

    本文基于Spark 1.6.3源码,采用一步一步深入的方式来展开阅读,本文是为了纪录自己在阅读源码时候的思路,看完一遍真的很容易忘记,写一篇文章梳理一遍可以加深印象。

    spark源码阅读之executor模块①中,AppClient已经实例化完成,且注册了名为ClientEndpoint的通信端,调用其onStart方法,在其中又调用了registerWithMaster方法向Master注册App,本文将详细剖析如何注册App,注册完之后又是如何分配加载Executor和相关资源的。

    向Master注册app

    registerWithMaster(1)的参数传入整型数字1,表明这是第一次向Master注册,程序会周期性尝试向Master注册app,直到收到Master返回已经注册成功的信息,或者尝试达到最大次数而失败,以下是源码实现:

    /**
     * Register with all masters asynchronously. It will call `registerWithMaster` every
     * REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
     * Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
     *
     * nthRetry means this is the nth attempt to register with master.
      *
      * 异步的向所有master发起注册请求,每隔REGISTRATION_TIMEOUT_SECONDS周期将会重新尝试注册
      * 直到达到最大重试次数REGISTRATION_RETRIES
      * 一旦成功连上了某台master,所有的调度工作和异步请求句柄将会被取消
      *
      * nthRetry代表重复调用自身注册的次数
     */
    private def registerWithMaster(nthRetry: Int) {
      registerMasterFutures.set(tryRegisterAllMasters())
      registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = {
          if (registered.get) {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerMasterThreadPool.shutdownNow()
          } else if (nthRetry >= REGISTRATION_RETRIES) {
            markDead("All masters are unresponsive! Giving up.")
          } else {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerWithMaster(nthRetry + 1)
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
    }
    

    接下来我们看其中异步请求的方法tryRegisterAllMasters,它返回的是一个Futures数组,表明它是不堵塞线程的,哪个线程先拿到注册的回应都可以,那么其他的Future句柄就会被取消掉,在tryRegisterAllMasters方法中,通过sparkUrl拿到Master的地址,在这里注册了Master的EndpointRef,发送一条RegisterApplication消息,其中封装了AppDescription,以下是源码:

    /**
     *  Register with all masters asynchronously and returns an array `Future`s for cancellation.
     */
    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef =
              rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
            masterRef.send(RegisterApplication(appDescription, self))   //向Master发起注册Application的请求
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }
    
    Master的回应

    Master在收到AppClient的RegisterApplication请求后,首先如果是standby master则不做响应,如果是active的,那么它会创建一个ApplicationInfo实例将传过来的app信息封装,然后注册App,持久化注册的App,然后给AppClient一个回应,让它别再请求了,最后会为新注册的App调度资源,以下是源码:

    case RegisterApplication(description, driver) => {    //从appClient接收到注册app的请求
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)    //创建ApplicationInfo
        registerApplication(app)    //注册app
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)   //持久化
        driver.send(RegisteredApplication(app.id, self))    //向AppClient发出注册响应信息
        schedule()    //重新调度资源:每次有新的app或者加入新的资源时都会调用
      }
    }
    

    接下来我们通过registerApplication方法来分析Master在注册App的过程中做了些什么:

    private def registerApplication(app: ApplicationInfo): Unit = {
      val appAddress = app.driver.address
      if (addressToApp.contains(appAddress)) {  //如果是已有的ClientEndpoint地址,则说明已经注册过了
        logInfo("Attempted to re-register application at same address: " + appAddress)
        return
      }
      //Master将App添加到自己维护的数据结构中
      applicationMetricsSystem.registerSource(app.appSource)
      apps += app
      idToApp(app.id) = app
      endpointToApp(app.driver) = app
      addressToApp(appAddress) = app
      waitingApps += app
    }
    

    Master端注册App的过程其实就是将App维护到自己的成员变量中。

    接着,Master将注册的App元数据信息持久化,持久化引擎有两种:依赖于Zookeeper,或者直接落地到FileSystem,用户也可以自定义持久化的方式,一般生产环境会托管给Zookeeper管理。

    持久化之后,Master向AppClient发出RegisteredApplication的响应,表明App已注册,AppClient在收到响应后,也会去更新它维护的一些数据结构,然后取消所有的注册请求,以下是源码:

    case RegisteredApplication(appId_, masterRef) =>
      // FIXME How to handle the following cases?
      // 1. A master receives multiple registrations and sends back multiple
      // RegisteredApplications due to an unstable network.
      // 2. Receive multiple RegisteredApplication from different masters because the master is
      // changing.
      appId.set(appId_)
      registered.set(true)
      master = Some(masterRef)
      listener.connected(appId.get)
    

    Master对于注册的App所做的最后一件事情就是,重新调度资源,除了新加入App的情况外,资源本身有变动,如新增一台Worker,也会调用schedule方法重新调度资源,以下是其源码解析:

    /**
     * Schedule the currently available resources among waiting apps. This method will be called
     * every time a new app joins or resource availability changes.
     */
    private def schedule(): Unit = {
      if (state != RecoveryState.ALIVE) {
        return    //ALIVE表明Master是主Master,且已经COMPLETING_RECOVERY
      }
      // Drivers take strict precedence over executors
      // 筛选出Alive的workers并将它们打散
      val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
      val numWorkersAlive = shuffledAliveWorkers.size
      var curPos = 0
      for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
        // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
        // start from the last worker that was assigned a driver, and continue onwards until we have
        // explored all alive workers.
        var launched = false
        var numWorkersVisited = 0
        while (numWorkersVisited < numWorkersAlive && !launched) {
          val worker = shuffledAliveWorkers(curPos)
          numWorkersVisited += 1    //已经访问过的Worker
          // 如果这个worker的剩余内存和cores满足driver的需求
          if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
            launchDriver(worker, driver)  //在这个worker上加载driver
            waitingDrivers -= driver    //清除缓存
            launched = true   //修改标志退出循环
          }
          curPos = (curPos + 1) % numWorkersAlive   //确保访问指针的散列性
        }
      }
      startExecutorsOnWorkers()   //开始在workers上加载executors
    }
    

    可以观察到,这个方法先是筛选出合适的worker,然后在其中一个上面调用launchDriver方法加载了driver,最后调用startExecutorsOnWorkers方法分配executors,至此终于进入正题:driver和executor的资源分配与加载,所以阅读源码是一件非常需要耐心的事情。

    driver和executor的资源分配

    首先来看launchDriver方法

    private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
      logInfo("Launching driver " + driver.id + " on worker " + worker.id)
      worker.addDriver(driver)
      driver.worker = Some(worker)    //维护worker和driver的关系
      worker.endpoint.send(LaunchDriver(driver.id, driver.desc))  //向worker发送LaunchDriver的请求
      driver.state = DriverState.RUNNING  //将Driver的状态置为RUNNING
    }
    

    launchDriver方法中维护了worker和driver的关系,并向worker端发送了LaunchDriver的请求,我们去worker端看它收到LaunchDriver请求的动作:

    case LaunchDriver(driverId, driverDesc) => {
      logInfo(s"Asked to launch driver $driverId")
      val driver = new DriverRunner(    //创建一个DriverRunner
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
      drivers(driverId) = driver
      driver.start()   //调用start方法启动driver
      coresUsed += driverDesc.cores   //driver消耗的cpu和内存
      memoryUsed += driverDesc.mem
    }
    

    可以看出程序创建了一个DriverRunner对象,然后调用其start方法启动线程,在start方法中下载并提交了额外的jar包,封装之后开启线程,之后向worker发送一条DriverStateChanged的消息。

    接下来展开分析startExecutorsOnWorkers方法,在workers上调度加载executors

    /**
     * Schedule and launch executors on workers
      * 在worker上调度加载executors
     */
    private def startExecutorsOnWorkers(): Unit = {
      // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
      // in the queue, then the second app, etc.
      // 对app的调度是一个简单FIFO的队列
      for (app <- waitingApps if app.coresLeft > 0) {   //对于队列中的每一个app
        val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
        // Filter out workers that don't have enough resources to launch an executor
        // 满足条件的worker:stat是ALIVE,内存和cpu满足app的对于一个executor的需求
        val usableWorkers: Array[WorkerInfo] = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
            worker.coresFree >= coresPerExecutor.getOrElse(1))
          .sortBy(_.coresFree).reverse
        // 记录对应usableWorkers中每个worker可以分配的cores
        val assignedCores: Array[Int] = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
        // Now that we've decided how many cores to allocate on each worker, let's allocate them
        // 这里开始为worker上的executors分配资源
        for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
          allocateWorkerResourceToExecutors(
            app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
        }
      }
    }
    

    该方法中显示从一个简单FIFO的app队列中取出一个app,app中存有每个executor需要的内存和cpu,再筛选出合适的workers,合适的workers是指:它首先是活着的,其次它的剩余内存和cpu满足该app每个executor需要的内存和cpu,筛选出这些满足条件的workers然后按照空闲cpu倒序排列。

    接下来scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)这个方法是在筛选出的workers中分配executors。

    入参中有个spreadOutApps参数比较特别,是用来区分两种不同的分配策略,字面上的意思是分散分配apps,所以:

    1. 当spreadOutApps为true时,尽量分散分配executors在更多的workers上,程序默认是这种模式。
    2. 当spreadOutApps为false时,则在更少的workers上分配executors,适用于cpu密集型且内存占用较少的应用。

    下面我们点入scheduleExecutorsOnWorkers来看其实现

    private def scheduleExecutorsOnWorkers(
        app: ApplicationInfo,
        usableWorkers: Array[WorkerInfo],
        spreadOutApps: Boolean): Array[Int] = {
      val coresPerExecutor = app.desc.coresPerExecutor  //取出app中的cores分配需求coresPerExecutor
      val minCoresPerExecutor = coresPerExecutor.getOrElse(1) //最少cpu取coresPerExecutor,如果不存在则是1
      val oneExecutorPerWorker: Boolean = coresPerExecutor.isEmpty  //coresPerExecutor为空则oneExecutorPerWorker为tr
      val memoryPerExecutor = app.desc.memoryPerExecutorMB  //取出app中内存分配需求memoryPerExecutor
      val numUsable = usableWorkers.length  //待分配workers个数
      val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
      val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
      var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)   //可以分配的cores取app需求的cores
      /** Return whether the specified worker can launch an executor for this app. */
      def canLaunchExecutor(pos: Int): Boolean = {
        // 条件1:需要继续分配的cores大于等于每个executor需求的cores,最少为1core
        val keepScheduling = coresToAssign >= minCoresPerExecutor
        // 条件2:每个worker上剩余的cores是否大于minCoresPerExecutor
        val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
        // If we allow multiple executors per worker, then we can always launch new executors.
        // Otherwise, if there is already an executor on this worker, just give it more cores.
        val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
        // 如果coresPerExecutor不是空的,也就是用户通过参数定义了,或者已分配的executor为0
        if (launchingNewExecutor) {
          val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
          // 条件3:每个worker上剩余的内存是否大于等于每个executor需要的内存
          val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
          // 条件4:已分配的executor总和加上app需要的executor数量,是否小于app的可以分配的executor限制
          val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
          // 条件1 2 3 4都满足则返回true,否则返回false
          keepScheduling && enoughCores && enoughMemory && underLimit
        } else {    //如果是需要给已存在的executor添加cores
          // We're adding cores to an existing executor, so no need
          // to check memory and executor limits
          keepScheduling && enoughCores //仅需要满足持续调度和足够cores两个条件即可
        }
      }
      // Keep launching executors until no more workers can accommodate any
      // more executors, or if we have reached this application's limits
      var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
      while (freeWorkers.nonEmpty) {
        freeWorkers.foreach { pos =>
          var keepScheduling = true
          while (keepScheduling && canLaunchExecutor(pos)) {
            coresToAssign -= minCoresPerExecutor
            assignedCores(pos) += minCoresPerExecutor
            // If we are launching one executor per worker, then every iteration assigns 1 core
            // to the executor. Otherwise, every iteration assigns cores to a new executor.
            if (oneExecutorPerWorker) {
              assignedExecutors(pos) = 1
            } else {
              assignedExecutors(pos) += 1
            }
            // Spreading out an application means spreading out its executors across as
            // many workers as possible. If we are not spreading out, then we should keep
            // scheduling executors on this worker until we use all of its resources.
            // Otherwise, just move on to the next worker.
            if (spreadOutApps) {
              keepScheduling = false  //退出这个循环,去找下一个worker继续分配executors
            }
          }
        }
        freeWorkers = freeWorkers.filter(canLaunchExecutor)
      }
      assignedCores
    }
    

    代码段较长,其中我也做了很详细的注释,这里就只说一下workers可以继续分配executors的条件:
    条件1(keepScheduling):如果需要继续分配的cores数量(取值于app还需要的cores和workers还可提供的cores的最小值)大于等于每个executor需求的cores,则为true,满足持续调度条件;
    条件2(enoughCores):如果workers队列中存有一个worker它剩余的cores满足每个executor需求的cores,则为true,说明还有足够的cores;
    条件3(enoughMemory):如果workers队列中存有一个worker剩余的内存满足每个executor需要的内存,则为true,说明还有足够的内存;
    条件4(underLimit):这次调度需要分配的executors数量+已分配的executors数量小于app的executors总数限制,则为true,app在初始化分配的时候默认没有限制,而当后续分配的时候这个限制会根据情况动态变化。

    以上4个条件,当分配新的executor的时候需要全部满足,如果是给现有的executor增加cores,仅满足条件1和2即可。

    executor的加载

    当资源分配完成之后,开始加载executors了,在allocateWorkerResourceToExecutors方法中封装了worker和它对应的executor以及相关资源为一个ExecutorDesc对象,并调用launchExecutor方法加载Executor,源码如下:

    private def allocateWorkerResourceToExecutors(
        app: ApplicationInfo,
        assignedCores: Int,
        coresPerExecutor: Option[Int],
        worker: WorkerInfo): Unit = {
      // If the number of cores per executor is specified, we divide the cores assigned
      // to this worker evenly among the executors with no remainder.
      // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
      val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
      val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
      for (i <- 1 to numExecutors) {
        val exec: ExecutorDesc = app.addExecutor(worker, coresToAssign)
        // Master调用launchExecutor方法来向worker发送请求,同时会更新Master保存的Worker的信息
        // 这些资源信息并不是Worker主动上报到Master的,而是Master主动维护的,Master不会等到Worker上成功启动Executor再来更新Worker信息
        // 如果Worker启动Executor启动失败,那么它会发送FAILED消息给Master
        launchExecutor(worker, exec)
        app.state = ApplicationState.RUNNING
      }
    }
    

    在launchExecutor方法中,Master向Worker发送了LaunchExecutor消息,Worker端收到相关信息之后开始按照Master的分配调度来加载executors

    private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
      logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
      worker.addExecutor(exec)    //更新Worker的信息
      worker.endpoint.send(LaunchExecutor(masterUrl,    //向Worker发送LaunchExecutor请求,Worker接到请求后就会开始加载executors
        exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
      exec.application.driver.send(   //向appClient发送executor已经添加的信息
        ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
    }
    

    在以上方法中,Master还向AppClient端发送了ExecutorAdded的消息。

    我们首先来看Worker端收到LaunchExecutor消息后的动作:

    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>    //从Master收到请求加载executors
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
          // Create the executor's working directory
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }
          // Create local dirs for the executor. These are passed to the executor via the
          // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
          // application finishes.
          val appLocalDirs = appDirectories.get(appId).getOrElse {
            Utils.getOrCreateLocalRootDirs(conf).map { dir =>
              val appDir = Utils.createDirectory(dir, namePrefix = "executor")
              Utils.chmod700(appDir)
              appDir.getAbsolutePath()
            }.toSeq
          }
          appDirectories(appId) = appLocalDirs
          val manager = new ExecutorRunner(   //创建一个ExecutorRunner
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager
          manager.start()   //启动线程
          coresUsed += cores_
          memoryUsed += memory_
          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))    //告知Master executor的状态改变了
        } catch {
          case e: Exception => {
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
          }
        }
      }
    

    其中主要创建了ExecutorRunner,其中封装了executor加载需要的信息,调用其start方法启动,在start方法中创建了一个线程,并调用fetchAndRunExecutor方法

    private[worker] def start() {
      workerThread = new Thread("ExecutorRunner for " + fullId) {
        override def run() { fetchAndRunExecutor() }    //调用fetchAndRunExecutor向driver注册executor
      }
      workerThread.start()
      // Shutdown hook that kills actors on shutdown.
      shutdownHook = ShutdownHookManager.addShutdownHook { () =>
        // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
        // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
        if (state == ExecutorState.RUNNING) {
          state = ExecutorState.FAILED
        }
        killProcess(Some("Worker shutting down")) }
    }
    

    在fetchAndRunExecutor方法中创建了一盒ProcessBuilder对象,然后封装了一些参数,最后通过执行命令的方式启动了CoarseGrainedExecutorBackend,先看源码:

    private def fetchAndRunExecutor() {
      try {
        // Launch the process
        // 拼接linux命令用来启动CoarseGrainedExecutorBackend
        val builder: ProcessBuilder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
          memory, sparkHome.getAbsolutePath, substituteVariables)
        val command = builder.command()
        val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
        logInfo(s"Launch command: $formattedCommand")
        builder.directory(executorDir)
        builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
        // In case we are running this from within the Spark Shell, avoid creating a "scala"
        // parent process for the executor command
        builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
        // Add webUI log urls
        val baseUrl =
          s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
        builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
        builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
        process = builder.start()   //使用ProcessBuilder执行linux命令,启动CoarseGrainedExecutorBackend
        val header = "Spark Executor Command: %s\n%s\n\n".format(
          formattedCommand, "=" * 40)
        // Redirect its stdout and stderr to files
        // 重定向stdout和stderr
        val stdout = new File(executorDir, "stdout")
        stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
        val stderr = new File(executorDir, "stderr")
        Files.write(header, stderr, UTF_8)
        stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
        // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
        // or with nonzero exit code
        // 等待进程退出
        val exitCode = process.waitFor()
        state = ExecutorState.EXITED
        val message = "Command exited with code " + exitCode
        // 向worker发送ExecutorStateChanged的消息
        worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
      } catch {
        case interrupted: InterruptedException => {
          logInfo("Runner thread for executor " + fullId + " interrupted")
          state = ExecutorState.KILLED
          killProcess(None)
        }
        case e: Exception => {
          logError("Error running executor", e)
          state = ExecutorState.FAILED
          killProcess(Some(e.toString))
        }
      }
    }
    

    这里有个问题我绕了好久才搞清楚,就是为什么执行的linux命令是启动了CoarseGrainedExecutorBackend进程,从这段代码完全看不出任何端倪,这需要追溯到SparkDeploySchedulerBackend创建AppClient对象的时候,封装ApplicationDescription对象时,其中有一个参数是command,这个command指定了创建CoarseGrainedExecutorBackend,创建AppClient这部分过程在spark源码阅读之executor模块①中分析过,我这里把关键的代码贴出来:

    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
    client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    

    Linux命令通过CoarseGrainedExecutorBackend的main方法来启动进程,main方法中解析了命令中传入的一些启动参数,然后调用run方法启动:

    def main(args: Array[String]) {
      var driverUrl: String = null
      var executorId: String = null
      var hostname: String = null
      var cores: Int = 0
      var appId: String = null
      var workerUrl: Option[String] = None
      val userClassPath = new mutable.ListBuffer[URL]()
      var argv = args.toList
      while (!argv.isEmpty) {
        argv match {
          case ("--driver-url") :: value :: tail =>
            driverUrl = value
            argv = tail
          case ("--executor-id") :: value :: tail =>
            executorId = value
            argv = tail
          case ("--hostname") :: value :: tail =>
            hostname = value
            argv = tail
          case ("--cores") :: value :: tail =>
            cores = value.toInt
            argv = tail
          case ("--app-id") :: value :: tail =>
            appId = value
            argv = tail
          case ("--worker-url") :: value :: tail =>
            // Worker url is used in spark standalone mode to enforce fate-sharing with worker
            workerUrl = Some(value)
            argv = tail
          case ("--user-class-path") :: value :: tail =>
            userClassPath += new URL(value)
            argv = tail
          case Nil =>
          case tail =>
            // scalastyle:off println
            System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
            // scalastyle:on println
            printUsageAndExit()
        }
      }
      if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
        appId == null) {
        printUsageAndExit()
      }
      run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
    

    run方法中比较关键的代码段是,注册了名为”Executor“的通信端,这样就调用了生命周期的onStart方法,由于篇幅较长,我把关键代码截选出来:

    env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
            env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
    

    在onStart方法中,向DriverEndpoint发起了RegisterExecutor请求,注册Executor,DriverEndpoint的创建过程在spark源码阅读之executor模块①中有说明,以下为onstart源码:

    override def onStart() {
      logInfo("Connecting to driver: " + driverUrl)
      rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
        // This is a very fast action so we can use "ThreadUtils.sameThread"
        driver = Some(ref)  //DriverEndpoint的引用
        ref.ask[RegisterExecutorResponse](
          RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))  //向driver注册executor
      }(ThreadUtils.sameThread).onComplete {
        // This is a very fast action so we can use "ThreadUtils.sameThread"
        case Success(msg) => Utils.tryLogNonFatalError {
          Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
        }
        case Failure(e) => {
          logError(s"Cannot register with driver: $driverUrl", e)
          System.exit(1)
        }
      }(ThreadUtils.sameThread)
    }
    

    DriverEndpoint在收到executor的注册请求后,会创建ExecutorData对象封装executor的信息,然后把executor注册到其数据结构中,最后调用makeOffers()方法给注册的executor分配Task,这里就和spark源码阅读之executor模块①中最后一节内容衔接上了,分配Task的内容我们在下一篇文章中展开。以下是RegisterExecutor的源码:

      case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>   //driver在接收到RegisterExecutor请求之后
        if (executorDataMap.contains(executorId)) { //如果已存在该executorId
          context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
        } else {
          // If the executor's rpc env is not listening for incoming connections, `hostPort`
          // will be null, and the client connection should be used to contact the executor.
          val executorAddress = if (executorRef.address != null) {
              executorRef.address
            } else {
              context.senderAddress
            }
          logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
          // 维护本身创建的一些数据结构
          addressToExecutorId(executorAddress) = executorId
          totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
          val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host,
            cores, cores, logUrls)    //创建一个ExecuterData,把executor的一些信息封装进去
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          CoarseGrainedSchedulerBackend.this.synchronized {
            executorDataMap.put(executorId, data)
            if (numPendingExecutors > 0) {
              numPendingExecutors -= 1
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
            }
          }
          // Note: some tests expect the reply to come after we put the executor in the map
          // 将executor的注册信息放入executorDataMap后,回复executor已注册完成
          context.reply(RegisteredExecutor(executorAddress.host))
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          makeOffers()    //开始给注册的executor分配Task
        }
    

    接下来我们看一下executor端收到已注册完成消息之后的动作:

    case RegisteredExecutor(hostname) =>
      logInfo("Successfully registered with driver")
      executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
    

    CoarseGrainedExecutorBackend收到DriverEndpoint的RegisteredExecutor消息之后,创建了executor实例,至此exector创建完成,接下来需要考虑的是如何给executor分配Task并执行,将放在下一篇文章中展开。

    相关文章

      网友评论

          本文标题:spark源码阅读之executor模块②

          本文链接:https://www.haomeiwen.com/subject/nqrrjqtx.html