美文网首页工作生活
Spark 源码分析(五): Executor 启动

Spark 源码分析(五): Executor 启动

作者: stone_zhu | 来源:发表于2019-07-02 21:35 被阅读0次

    上一篇已经将 Application 注册到了 master 上了,在 master 收到注册消息后会进行一系列操作,最后调用 schedule 方法。

    这个 schedule 方法会去做两件事,一件事是给等待调度的 driver 分配资源,另一件事是给等待调度的 application 去分配资源启动 Executor。

    给 application 分配资源启动 Executor 的代码最终会调用一个方法:launchExecutor(是 Master 中的代码)。

    在 lauchExecutor 方法中会先向 worker 发送 lauchExecutor 消息,然后会向 driver 发送 executor 已经启动的消息。

    private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
        logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
        worker.addExecutor(exec)
        // 向 worker 发送 LaunchExecutor 消息
        worker.endpoint.send(LaunchExecutor(masterUrl,
          exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
         // 向 driver 发送 ExecutorAdded 的消息
        exec.application.driver.send(
          ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
      }
    

    下面看 worker 端收到 launchExecutor 消息后是怎么处理的。

    同样的在 receive 的模式匹配中找到该消息的匹配,可以看到做了这些事情:

    1,先判断发消息的 master 是否是 alive 状态,如果是才会继续执行;

    2,创建 executor 的工作目录和本地临时目录;

    3,将 master 发来的消息封装为 ExecutorRunner 对象,然后调用其 start 方法启动线程;

    4,向 master 发送消息,告诉当前 executor 的状态;

    // 模式匹配,是 LaunchExecutor 消息
    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
            // 如果发送消息的 master 不是 active 的则不执行    
            if (masterUrl != activeMasterUrl) {
            logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
          } else {
            try {
              logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
    
              // 创建 executor 的工作目录
              val executorDir = new File(workDir, appId + "/" + execId)
              if (!executorDir.mkdirs()) {
                throw new IOException("Failed to create directory " + executorDir)
              }
    
              // 通过 SPARK_EXECUTOR_DIRS 环境变量配置创建 Executor 的本地目录
              // Worker 会在当前 application 执行结束后删除这个目录
              val appLocalDirs = appDirectories.getOrElse(appId,
                Utils.getOrCreateLocalRootDirs(conf).map { dir =>
                  val appDir = Utils.createDirectory(dir, namePrefix = "executor")
                  Utils.chmod700(appDir)
                  appDir.getAbsolutePath()
                }.toSeq)
              appDirectories(appId) = appLocalDirs
              // 将接收到的 application 中 Executor 的相关信息封装为一个 ExecutorRunner 对象
              val manager = new ExecutorRunner(
                appId,
                execId,
                appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
                cores_,
                memory_,
                self,
                workerId,
                host,
                webUi.boundPort,
                publicAddress,
                sparkHome,
                executorDir,
                workerUri,
                conf,
                appLocalDirs, ExecutorState.RUNNING)
              executors(appId + "/" + execId) = manager
              // 启动这个线程
              manager.start()
              // 更新 worker 的资源利用情况
              coresUsed += cores_
              memoryUsed += memory_
              // 给 master 回复消息
              sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
            } catch {
              case e: Exception =>
                logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
                if (executors.contains(appId + "/" + execId)) {
                  executors(appId + "/" + execId).kill()
                  executors -= appId + "/" + execId
                }
                sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
                  Some(e.toString), None))
            }
          }
    

    这里主要看 ExecutorRunner 调用 start 之后做了什么。

    实际上是创建了一个线程,线程运行时会去执行 fetchAndRunExecutor 这个方法。

    private[worker] def start() {
        // 创建线程
        workerThread = new Thread("ExecutorRunner for " + fullId) {
          override def run() { fetchAndRunExecutor() }
        }
        // 启动线程
        workerThread.start()
        // Shutdown hook that kills actors on shutdown.
        shutdownHook = ShutdownHookManager.addShutdownHook { () =>
          // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
          // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
          if (state == ExecutorState.RUNNING) {
            state = ExecutorState.FAILED
          }
          killProcess(Some("Worker shutting down")) }
      }
    

    fetchAndRunExecutor 这个方法将接收到的信息做如下一些事情:

    1,创建 ProcessBuilder,准备执行本地命令;

    2,为 ProcessBuilder 创建执行目录,设置环境变量;

    3,启动 ProcessBuilder,生成 Executor 进程,这个进程的名称一般为:CoarseGrainedExecutorBackend;

    4,重定向输出流和错误文件流;

    5,等待获取 executor 的退出码,然后发送给 worker;

    private def fetchAndRunExecutor() {
        try {
          // 创建 ProcessBuilder
          val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
            memory, sparkHome.getAbsolutePath, substituteVariables)
          // 封装指令
          val command = builder.command()
          val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
          logInfo(s"Launch command: $formattedCommand")
    
          builder.directory(executorDir)
          builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
          // In case we are running this from within the Spark Shell, avoid creating a "scala"
          // parent process for the executor command
          builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
    
          // Add webUI log urls
          val baseUrl =
            if (conf.getBoolean("spark.ui.reverseProxy", false)) {
              s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
            } else {
              s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
            }
          builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
          builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
    
          // 启动进程
          process = builder.start()
          val header = "Spark Executor Command: %s\n%s\n\n".format(
            formattedCommand, "=" * 40)
    
          // 重定向标准输出流
          // Redirect its stdout and stderr to files
          val stdout = new File(executorDir, "stdout")
          stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
    
          // 重定向错误输出流
          val stderr = new File(executorDir, "stderr")
          Files.write(header, stderr, StandardCharsets.UTF_8)
          stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
    
          // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
          // or with nonzero exit code
          // 等待退出码
          val exitCode = process.waitFor()
          state = ExecutorState.EXITED
          val message = "Command exited with code " + exitCode
          // 将退出码发送给 worker
          worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
        } catch {
          case interrupted: InterruptedException =>
            logInfo("Runner thread for executor " + fullId + " interrupted")
            state = ExecutorState.KILLED
            killProcess(None)
          case e: Exception =>
            logError("Error running executor", e)
            state = ExecutorState.FAILED
            killProcess(Some(e.toString))
        }
      }
    

    至此,Executor 是启动完成了。

    相关文章

      网友评论

        本文标题:Spark 源码分析(五): Executor 启动

        本文链接:https://www.haomeiwen.com/subject/spkxhctx.html