美文网首页Spark深入学习Spark-core源码精读spark
Spark-Core源码精读(10)、注册Application

Spark-Core源码精读(10)、注册Application

作者: sun4lower | 来源:发表于2017-03-10 17:16 被阅读222次

    承接上一篇文章,我们继续来分析Executor的启动过程,本文主要分为两部分:

    • 向worker发送启动Executor的消息
    • 启动完成后向driver发送ExecutorAdded的消息,这里的driver就是ClientEndpoint
    private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
      logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
      worker.addExecutor(exec)
      worker.endpoint.send(LaunchExecutor(masterUrl,
        exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
      exec.application.driver.send(
        ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
    }
    

    启动Executor

    首先我们分析Worker在接收到LaunchExecutor消息之后所执行的操作:

    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      // 首先判断Master是否为Active状态
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
          // 创建executor的工作目录
          // Create the executor's working directory
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }
          // Create local dirs for the executor. These are passed to the executor via the
          // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
          // application finishes.
          根据application创建executor的本地目录,可以通过SPARK_EXECUTOR_DIRS进行配置
          val appLocalDirs = appDirectories.get(appId).getOrElse {
            Utils.getOrCreateLocalRootDirs(conf).map { dir =>
              val appDir = Utils.createDirectory(dir, namePrefix = "executor")
              Utils.chmod700(appDir)
              appDir.getAbsolutePath()
            }.toSeq
          }
          appDirectories(appId) = appLocalDirs
          // 实例化ExecutorRunner
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs, ExecutorState.RUNNING)
          // 保存在executors中
          executors(appId + "/" + execId) = manager
          // 执行ExecutorRunner的start方法
          manager.start()
          // 修改计算资源的使用情况
          coresUsed += cores_
          memoryUsed += memory_
          // 向Master发送ExecutorStateChanged的消息
          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
        } catch {
          case e: Exception => {
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
          }
        }
      }
    

    首先实例化ExecutorRunner,ExecutorRunner就是Standalone模式下用来管理一个executor进程的执行的。然后调用ExecutorRunner的start()方法:

    private[worker] def start() {
      workerThread = new Thread("ExecutorRunner for " + fullId) {
        override def run() { fetchAndRunExecutor() }
      }
      workerThread.start()
      // Shutdown hook that kills actors on shutdown.
      shutdownHook = ShutdownHookManager.addShutdownHook { () =>
        // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
        // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
        if (state == ExecutorState.RUNNING) {
          state = ExecutorState.FAILED
        }
        killProcess(Some("Worker shutting down")) }
    }
    

    可以看见内部创建了一条线程用来执行fetchAndRunExecutor方法,当调用线程的start方法时,线程中的run方法运行,即fetchAndRunExecutor()方法开始执行:

    private def fetchAndRunExecutor() {
      try {
        // Launch the process
        // 首先构建command
        val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
          memory, sparkHome.getAbsolutePath, substituteVariables)
        val command = builder.command()
        val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
        logInfo(s"Launch command: $formattedCommand")
        // 设置Executor的本地目录并设置一些配置参数
        builder.directory(executorDir)
        builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
        // In case we are running this from within the Spark Shell, avoid creating a "scala"
        // parent process for the executor command
        builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
        // Add webUI log urls
        val baseUrl =
          s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
        builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
        builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
        // 开启一个新的进程运行command
        process = builder.start()
        val header = "Spark Executor Command: %s\n%s\n\n".format(
          formattedCommand, "=" * 40)
        // Redirect its stdout and stderr to files
        val stdout = new File(executorDir, "stdout")
        stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
        val stderr = new File(executorDir, "stderr")
        Files.write(header, stderr, UTF_8)
        stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
        // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
        // or with nonzero exit code
        val exitCode = process.waitFor()
        state = ExecutorState.EXITED
        val message = "Command exited with code " + exitCode
        worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
      } catch {
        case interrupted: InterruptedException => {
          logInfo("Runner thread for executor " + fullId + " interrupted")
          state = ExecutorState.KILLED
          killProcess(None)
        }
        case e: Exception => {
          logError("Error running executor", e)
          state = ExecutorState.FAILED
          killProcess(Some(e.toString))
        }
      }
    }
    

    这里最重要的就是process = builder.start(),即开启一个新的线程来运行我们构建的command,也就是说开辟一个新的进程(JVM)来运行"org.apache.spark.executor.CoarseGrainedExecutorBackend"这个类的main方法,还记得这是在哪里设置的吗,没错,就是SparkDeploySchedulerBackend的start()方法中,所以我们现在进入CoarseGrainedExecutorBackend这个类的main方法:

    def main(args: Array[String]) {
      var driverUrl: String = null
      var executorId: String = null
      var hostname: String = null
      var cores: Int = 0
      var appId: String = null
      var workerUrl: Option[String] = None
      val userClassPath = new mutable.ListBuffer[URL]()
      var argv = args.toList
      // 这里就是通过我们构建command的时候传入的参数对变量进行初始化操作
      while (!argv.isEmpty) {
        argv match {
          case ("--driver-url") :: value :: tail =>
            driverUrl = value
            argv = tail
          case ("--executor-id") :: value :: tail =>
            executorId = value
            argv = tail
          case ("--hostname") :: value :: tail =>
            hostname = value
            argv = tail
          case ("--cores") :: value :: tail =>
            cores = value.toInt
            argv = tail
          case ("--app-id") :: value :: tail =>
            appId = value
            argv = tail
          case ("--worker-url") :: value :: tail =>
            // Worker url is used in spark standalone mode to enforce fate-sharing with worker
            workerUrl = Some(value)
            argv = tail
          case ("--user-class-path") :: value :: tail =>
            userClassPath += new URL(value)
            argv = tail
          case Nil =>
          case tail =>
            // scalastyle:off println
            System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
            // scalastyle:on println
            printUsageAndExit()
        }
      }
      if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
        appId == null) {
        printUsageAndExit()
      }
      // 如果传入的参数没有问题就执行run方法
      run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
    }
    

    这里要先说明一下,CoarseGrainedExecutorBackend实际上实现的是ExecutorBackend,而ExecutorBackend根据集群的运行模式不同有三种不同的实现,分别是CoarseGrainedExecutorBackend、LocalBackend、MesosExecutorBackend,而这里的CoarseGrainedExecutorBackend就是Standalone模式下的具体实现,而Standalone模式下是通过ExecutorRunner来启动一个进程运行CoarseGrainedExecutorBackend的main方法的。

    接下来就是调用run方法:

    private def run(
        driverUrl: String,
        executorId: String,
        hostname: String,
        cores: Int,
        appId: String,
        workerUrl: Option[String],
        userClassPath: Seq[URL]) {
      SignalLogger.register(log)
      SparkHadoopUtil.get.runAsSparkUser { () =>
        // Debug code
        Utils.checkHost(hostname)
        // Bootstrap to fetch the driver's Spark properties.
        val executorConf = new SparkConf
        val port = executorConf.getInt("spark.executor.port", 0)
        val fetcher = RpcEnv.create(
          "driverPropsFetcher",
          hostname,
          port,
          executorConf,
          new SecurityManager(executorConf),
          clientMode = true)
        val driver = fetcher.setupEndpointRefByURI(driverUrl)
        val props = driver.askWithRetry[Seq[(String, String)]](RetrieveSparkProps) ++
          Seq[(String, String)](("spark.app.id", appId))
        fetcher.shutdown()
        // Create SparkEnv using properties we fetched from the driver.
        val driverConf = new SparkConf()
        for ((key, value) <- props) {
          // this is required for SSL in standalone mode
          if (SparkConf.isExecutorStartupConf(key)) {
            driverConf.setIfMissing(key, value)
          } else {
            driverConf.set(key, value)
          }
        }
        if (driverConf.contains("spark.yarn.credentials.file")) {
          logInfo("Will periodically update credentials from: " +
            driverConf.get("spark.yarn.credentials.file"))
          SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf)
        }
        val env = SparkEnv.createExecutorEnv(
          driverConf, executorId, hostname, port, cores, isLocal = false)
        // SparkEnv will set spark.executor.port if the rpc env is listening for incoming
        // connections (e.g., if it's using akka). Otherwise, the executor is running in
        // client mode only, and does not accept incoming connections.
        val sparkHostPort = env.conf.getOption("spark.executor.port").map { port =>
            hostname + ":" + port
          }.orNull
        env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
          env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
        workerUrl.foreach { url =>
          env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
        }
        env.rpcEnv.awaitTermination()
        SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
      }
    }
    

    上面的源码主要分为部分:

    • 从Driver上获得Spark的一些属性信息
    • 使用得到的信息创建ExecutorEnv即Executor的运行时环境
    • 然后实例化CoarseGrainedExecutorBackend并向RpcEnv进行注册
    • 注册时会调用CoarseGrainedExecutorBackend的onStart方法

    WorkerWatcher部分此处我们不做分析,我们看CoarseGrainedExecutorBackend的onStart方法:

    override def onStart() {
      logInfo("Connecting to driver: " + driverUrl)
      rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
        // This is a very fast action so we can use "ThreadUtils.sameThread"
        driver = Some(ref)
        // 向Driver发送RegisterExecutor消息
        ref.ask[RegisterExecutorResponse](
          RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
      }(ThreadUtils.sameThread).onComplete {
        // This is a very fast action so we can use "ThreadUtils.sameThread"
        case Success(msg) => Utils.tryLogNonFatalError {
          Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
        }
        case Failure(e) => {
          logError(s"Cannot register with driver: $driverUrl", e)
          System.exit(1)
        }
      }(ThreadUtils.sameThread)
    }
    

    这里我们需要关心的是这个driver到底是谁,即driverUrl到底是什么?

    那么我们追踪一下:driverUrl是实例化CoarseGrainedExecutorBackend的时候传入的,而执行实例化时候的这个driverUrl又是通过run方法传入的,而run方法中的driverUrl又是main方法执行的时候传入的,main方法的driverUrl是根据传入的参数获得的,即创建新进程时传入的参数,即执行的command,而command是通过appDesc的command构建的,而appDesc是在SparkDeploySchedulerBackend中的start方法中构建的,如下所示:

    // The endpoint for executors to talk to us
    val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
      RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    

    这里的CoarseGrainedSchedulerBackend.ENDPOINT_NAME是"CoarseGrainedScheduler":

    private[spark] object CoarseGrainedSchedulerBackend {
      val ENDPOINT_NAME = "CoarseGrainedScheduler"
    }
    

    而DriverEndpoint注册的时候就是使用的ENDPOINT_NAME

    driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
    

    所以这里的driverUrl指的就是DriverEndpoint,DriverEndpoint在接收到RegisterExecutor消息后执行的操作为:

    case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) =>
      if (executorDataMap.contains(executorId)) {
        context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
      } else {
        // If the executor's rpc env is not listening for incoming connections, `hostPort`
        // will be null, and the client connection should be used to contact the executor.
        val executorAddress = if (executorRef.address != null) {
            executorRef.address
          } else {
            context.senderAddress
          }
        logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
        addressToExecutorId(executorAddress) = executorId
        totalCoreCount.addAndGet(cores)
        totalRegisteredExecutors.addAndGet(1)
        val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host,
          cores, cores, logUrls)
        // This must be synchronized because variables mutated
        // in this block are read when requesting executors
        CoarseGrainedSchedulerBackend.this.synchronized {
          executorDataMap.put(executorId, data)
          if (numPendingExecutors > 0) {
            numPendingExecutors -= 1
            logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
          }
        }
        // Note: some tests expect the reply to come after we put the executor in the map
        context.reply(RegisteredExecutor(executorAddress.host))
        listenerBus.post(
          SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
        makeOffers()
      }
    

    如果一切正常DriverEndpoint会向CoarseGrainedExecutorBackend回复消息RegisteredExecutor,CoarseGrainedExecutorBackend接收到消息后实例化了Executor,具体的实例化过程中比较重要的两个部分就是初始化运行tasks的线程池和向driver发送心跳信息,部分源码如下:

    ...
    // 开启线程池,用来运行提交的tasks
    // Start worker thread pool
    private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
    private val executorSource = new ExecutorSource(threadPool, executorId)
    ...
    // 可以看到是开辟了一个线程来发送心跳
    // Executor for the heartbeat task.
    private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")
    // 使用driver中的HeartbeatReceiver来接收心跳,实际上HeartbeatReceiver是SparkContext实例化的时候创建的
    // must be initialized before running startDriverHeartbeat()
    private val heartbeatReceiverRef =
      RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
    /**
     * When an executor is unable to send heartbeats to the driver more than `HEARTBEAT_MAX_FAILURES`
     * times, it should kill itself. The default value is 60. It means we will retry to send
     * heartbeats about 10 minutes because the heartbeat interval is 10s.
     */
    // 上面的注释说的很清楚了,最大的失败次数是60次,每隔10s重试一次,也就是说可以重试10分钟
    private val HEARTBEAT_MAX_FAILURES = conf.getInt("spark.executor.heartbeat.maxFailures", 60)
    /**
     * Count the failure times of heartbeat. It should only be acessed in the heartbeat thread. Each
     * successful heartbeat will reset it to 0.
     */
    private var heartbeatFailures = 0
    // 开始发送心跳
    startDriverHeartbeater()
    

    具体startDriverHeartbeater()方法的实现这里就不追踪下去了,同时本文上述源码中出现的向Master、Worker、Driver回复消息的部分也不进行说明,大家可以自行阅读,其实原理都是一样的,就跟我们平时工作一样,如果公司来了一个新同事,当他准备完成认为可以工作了,就要向领导汇报,领导接收到汇报之后就会为其分配具体的工作任务。

    附上一副图,方便大家理解(注意该图只是画了主要流程,为了便于观看,Rpc通信的部分只是简单的画成了“A发送消息给B”的形式,特此说明)

    向driver发消息

    下面是向driver发送消息的部分,注意这里的driver指的是ClientEndpoint

    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
    }
    

    ClientEndpoint接收到消息后执行的操作:

    case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
      val fullId = appId + "/" + id
      logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort,
        cores))
      listener.executorAdded(fullId, workerId, hostPort, cores, memory)
    

    这里主要就是日志相关的工作了,不再阐述。

    至此Application的注册和Executor的启动注册大致的流程我们就走完了,接下来就是task的提交和运行的部分了。

    本文参照的是Spark 1.6.3版本的源码,同时给出Spark 2.1.0版本的连接:

    Spark 1.6.3 源码

    Spark 2.1.0 源码

    本文为原创,欢迎转载,转载请注明出处、作者,谢谢!

    相关文章

      网友评论

        本文标题:Spark-Core源码精读(10)、注册Application

        本文链接:https://www.haomeiwen.com/subject/bwkogttx.html