美文网首页
Spark源码:启动TaskScheduler

Spark源码:启动TaskScheduler

作者: Jorvi | 来源:发表于2019-12-17 15:38 被阅读0次

    源码目录


    初始化 SparkContext 时,会创建TaskScheduler,现在来看看TaskScheduler 启动过程。

    1 启动TaskScheduler

    调用_taskScheduler.start()启动TaskScheduler。

    • 进入org.apache.spark.scheduler.TaskSchedulerImpl.scala
      private val speculationScheduler =
        ThreadUtils.newDaemonSingleThreadScheduledExecutor("task-scheduler-speculation")
    
      override def start() {
        backend.start()
    
        if (!isLocal && conf.getBoolean("spark.speculation", false)) {
          logInfo("Starting speculative execution thread")
          speculationScheduler.scheduleWithFixedDelay(new Runnable {
            override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
              checkSpeculatableTasks()
            }
          }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
        }
      }
    
    1. backend.start()启动SchedulerBackend;
    2. 如果是 非Local模式 且 spark.speculation = true,即开启了推测机制,则定时启新线程执行checkSpeculatableTasks,检查可推测的Tasks。

    2 启动SchedulerBackend

    • 进入org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend.scala
    private[spark] class StandaloneSchedulerBackend(
        scheduler: TaskSchedulerImpl,
        sc: SparkContext,
        masters: Array[String])
      extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv)
      with StandaloneAppClientListener
      with Logging {
    
      override def start() {
        super.start()
    
        // SPARK-21159. The scheduler backend should only try to connect to the launcher when in client
        // mode. In cluster mode, the code that submits the application to the Master needs to connect
        // to the launcher instead.
        if (sc.deployMode == "client") {
          launcherBackend.connect()
        }
    
        // The endpoint for executors to talk to us
        val driverUrl = RpcEndpointAddress(
          sc.conf.get("spark.driver.host"),
          sc.conf.get("spark.driver.port").toInt,
          CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
        val args = Seq(
          "--driver-url", driverUrl,
          "--executor-id", "{{EXECUTOR_ID}}",
          "--hostname", "{{HOSTNAME}}",
          "--cores", "{{CORES}}",
          "--app-id", "{{APP_ID}}",
          "--worker-url", "{{WORKER_URL}}")
        val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
          .map(Utils.splitCommandString).getOrElse(Seq.empty)
        val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
          .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
        val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
          .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    
        // When testing, expose the parent class path to the child. This is processed by
        // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
        // when the assembly is built with the "*-provided" profiles enabled.
        val testingClassPath =
          if (sys.props.contains("spark.testing")) {
            sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
          } else {
            Nil
          }
    
        // Start executors with a few necessary configs for registering with the scheduler
        val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
        val javaOpts = sparkJavaOpts ++ extraJavaOpts
        val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
          args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
        val webUrl = sc.ui.map(_.webUrl).getOrElse("")
        val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
        // If we're using dynamic allocation, set our initial executor limit to 0 for now.
        // ExecutorAllocationManager will send the real initial limit to the Master later.
        val initialExecutorLimit =
          if (Utils.isDynamicAllocationEnabled(conf)) {
            Some(0)
          } else {
            None
          }
        val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
          webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
        client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
        client.start()
        launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
        waitForRegistration()
        launcherBackend.setState(SparkAppHandle.State.RUNNING)
      }
    
    }
    

    StandaloneSchedulerBackend 继承了 CoarseGrainedSchedulerBackend。

    1. super.start() 调用父类 CoarseGrainedSchedulerBackend 的 start 方法;
    2. 配置各种参数:driverUrl、args、extraJavaOpts、classPathEntries、libraryPathEntries、javaOpts等,构建Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", ......)
    3. 生成 ApplicationDescription,将 Command 加入到 ApplicationDescription 中,后面会使用到;
    4. 创建 StandaloneAppClient 并启动;
    5. 更新 app 状态为 SUBMITTED;
    6. 等待 app 注册并启动;
    7. 更新 app 状态为 RUNNING。

    注:这边的 2、3两步和提交 Application 时启动 Driver 的过程很相似:

    • 在启动 Driver 时,配置各种参数构建Command("org.apache.spark.deploy.worker.DriverWrapper",......),然后创建 DriverDescription;

    • 此处,配置各种参数构建Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", ......),然后创建 ApplicationDescription。

    具体见Spark源码:提交Application到Spark集群

    2.1 启动CoarseGrainedSchedulerBackend

    • 进入org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.scala
      override def start() {
        val properties = new ArrayBuffer[(String, String)]
        for ((key, value) <- scheduler.sc.conf.getAll) {
          if (key.startsWith("spark.")) {
            properties += ((key, value))
          }
        }
    
        // TODO (prashant) send conf instead of properties
        driverEndpoint = createDriverEndpointRef(properties)
      }
    
    
      protected def createDriverEndpointRef(
          properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
        rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
      }
    
    
      protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
        new DriverEndpoint(rpcEnv, properties)
      }
    

    创建 DriverEndpoint,以 “CoarseGrainedScheduler” 为名注册到 SparkContext.DriverSparkEnv.RpcEnv 上(详见 Spark源码:初始化SparkContext)。

    注:每次注册 RpcEndpoint 到 RpcEnv 上时,都会加入OnStart 到 Inbox 的队列中,因此必然要执行 RpcEndpoint.onStart() 方法。

    来看看 DriverEndpoint.onStart 方法。

    • 进入org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint
      class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
        extends ThreadSafeRpcEndpoint with Logging {
    
        override def onStart() {
          // Periodically revive offers to allow delay scheduling to work
          val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
    
          reviveThread.scheduleAtFixedRate(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              Option(self).foreach(_.send(ReviveOffers))
            }
          }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
        }
    
      }
    

    该方法中会启一个新线程定时给自己发送 ReviveOffers 消息。

    • 进入org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint.scala
        override def receive: PartialFunction[Any, Unit] = {
    
          case ReviveOffers =>
            makeOffers()
        }
    
    
        // Make fake resource offers on all executors
        private def makeOffers() {
          // Make sure no executor is killed while some task is launching on it
          val taskDescs = withLock {
            // Filter out executors under killing
            val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
            val workOffers = activeExecutors.map {
              case (id, executorData) =>
                new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
                  Some(executorData.executorAddress.hostPort))
            }.toIndexedSeq
            scheduler.resourceOffers(workOffers)
          }
          if (!taskDescs.isEmpty) {
            launchTasks(taskDescs)
          }
        }
    

    此方法会遍历 CoarseGrainedSchedulerBackend.executorDataMap,而此时 executorDataMap 中还没有任何东西,因此该方法等于啥也没干,等后面分析。

    2.2 创建 StandaloneAppClient 并启动

    • 进入org.apache.spark.deploy.client.StandaloneAppClient.scala
    private[spark] class StandaloneAppClient(
        rpcEnv: RpcEnv,
        masterUrls: Array[String],
        appDescription: ApplicationDescription,
        listener: StandaloneAppClientListener,
        conf: SparkConf)
      extends Logging {
    
      def start() {
        // Just launch an rpcEndpoint; it will call back into the listener.
        endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
      }
    
      // 省略部分内容
    }
    

    创建ClientEndpoint,并以 “AppClient” 为名注册到 SparkContext.DriverSparkEnv.RpcEnv 上(详见 Spark源码:初始化SparkContext)。

    • 进入org.apache.spark.deploy.client.StandaloneAppClient.ClientEndpoint.scala
        override def onStart(): Unit = {
          try {
            registerWithMaster(1)
          } catch {
            case e: Exception =>
              logWarning("Failed to connect to master", e)
              markDisconnected()
              stop()
          }
        }
    
    
        private def registerWithMaster(nthRetry: Int) {
          registerMasterFutures.set(tryRegisterAllMasters())
          registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
            override def run(): Unit = {
              if (registered.get) {
                registerMasterFutures.get.foreach(_.cancel(true))
                registerMasterThreadPool.shutdownNow()
              } else if (nthRetry >= REGISTRATION_RETRIES) {
                markDead("All masters are unresponsive! Giving up.")
              } else {
                registerMasterFutures.get.foreach(_.cancel(true))
                registerWithMaster(nthRetry + 1)
              }
            }
          }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
        }
    
    
        private def tryRegisterAllMasters(): Array[JFuture[_]] = {
          for (masterAddress <- masterRpcAddresses) yield {
            registerMasterThreadPool.submit(new Runnable {
              override def run(): Unit = try {
                if (registered.get) {
                  return
                }
                logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
                val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
                masterRef.send(RegisterApplication(appDescription, self))
              } catch {
                case ie: InterruptedException => // Cancelled
                case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
              }
            })
          }
        }
    
    1. 遍历所有masterRpcAddresses;
    2. 根据 masterAddress 和 masterEndpointName 获取masterRpcEndpointRef;
    3. 利用 masterRpcEndpointRef 发送 RegisterApplication(ApplicationDescription, DriverRpcEndpointRef) 消息。

    创建 StandaloneAppClient 并启动其实就是为了给 Master 发消息,准备注册 Application。

    2.3 注册Application

    • 进入org.apache.spark.deploy.master.Master.scala
      override def receive: PartialFunction[Any, Unit] = {
    
        case RegisterApplication(description, driver) =>
          // TODO Prevent repeated registrations from some driver
          if (state == RecoveryState.STANDBY) {
            // ignore, don't send response
          } else {
            logInfo("Registering app " + description.name)
            val app = createApplication(description, driver)
            registerApplication(app)
            logInfo("Registered app " + description.name + " with ID " + app.id)
            persistenceEngine.addApplication(app)
            driver.send(RegisteredApplication(app.id, self))
            schedule()
          }
    
      }
    
    1. 如果是 STANDBY Master,不回响应;
    2. 调用 createApplication(appDescription, driverRpcEndpointRef) 方法创建 ApplicationInfo;
    3. 调用 registerApplication 注册 app,即将上面创建的 ApplicationInfo 加入到 Master.waitingApps 中;
    4. 利用 driverRpcEndpointRef 发送 RegisteredApplication 消息,即发送消息 RegisteredApplication 给 Driver,告诉 Driver application已经注册完成;
    5. 调用 schedule() 方法。

    2.4 启动Application

    • 进入org.apache.spark.deploy.master.Master.scala
      private def schedule(): Unit = {
        if (state != RecoveryState.ALIVE) {
          return
        }
        // Drivers take strict precedence over executors
        val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
        val numWorkersAlive = shuffledAliveWorkers.size
        var curPos = 0
        for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
          // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
          // start from the last worker that was assigned a driver, and continue onwards until we have
          // explored all alive workers.
          var launched = false
          var numWorkersVisited = 0
          while (numWorkersVisited < numWorkersAlive && !launched) {
            val worker = shuffledAliveWorkers(curPos)
            numWorkersVisited += 1
            if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
              launchDriver(worker, driver)
              waitingDrivers -= driver
              launched = true
            }
            curPos = (curPos + 1) % numWorkersAlive
          }
        }
        startExecutorsOnWorkers()
      }
    

    Spark源码:提交Application到Spark集群 中,注册完 Driver 后也是调用该方法启动 Driver 的。

    那里由于还没有往 Master.waitingApps 中加入app,因此调用 startExecutorsOnWorkers 方法啥也不干,但是这里,已经有 app 加入到 Master.waitingApps 中了,因此调用 startExecutorsOnWorkers 方法会为 app 启动 Executors 了。

    说明几点:

    1. 这里注册 Application 时创建的 ApplicationInfo 加入到 Master.waitingApps 中,在 Spark源码:提交Application到Spark集群 中,注册 Driver 时创建的 DriverInfo 加入到了 Master.waitingDrivers 中;

    2. schedule() 方法中做了两件事:
      1)遍历 Master.waitingDrivers 启动各 Driver;
      2)遍历 Master.waitingApps 为各 App 启动 Executors。

    调用 startExecutorsOnWorkers 方法为 app 启动 Executors 的具体过程,后面文章分析。

    3 总结

    1. 调用 TaskSchedulerImpl.start 方法启动 TaskScheduler 时会调用 SchedulerBackend.start 方法启动 SchedulerBackend;
    2. SchedulerBackend 是 TaskScheduler 的后台线程,用于接收处理一些发给 TaskScheduler 的消息;
    3. StandaloneSchedulerBackend 启动时调用其父类 CoarseGrainedSchedulerBackend 的 start 方法用于启动 CoarseGrainedSchedulerBackend
    4. 启动 CoarseGrainedSchedulerBackend 时会创建 DriverEndpoint 并注册到 SparkContext.DriverSparkEnv.RpcEnv 上(详见 Spark源码:初始化SparkContext);
    5. DriverEndpoint.onStart 方法被调用,该方法中启一个新线程定时给自己发 ReviveOffers 消息,自己处理 ReviveOffers 消息时调用 makeOffers 方法,这一过程其实就是定时调度提交 Tasks 的过程
    6. 创建 StandaloneAppClient 并调用其 start 方法,给所有 Master 发送消息 RegisterApplication,准备注册 Application;
    7. Master 收到 RegisterApplication 消息后创建 ApplicationInfo 并放到 Master.waitingApps 中,表示 Application 已注册完,回响应给 Driver;
    8. 调用 schedule 方法启动 Application,schedule 方法内做两件事:
      1)遍历 Master.waitingDrivers 启动各 Driver
      2)遍历 Master.waitingApps 为各 App 启动 Executors
    9. 为 App 启动 Executors 过程后面文章分析。

    相关文章

      网友评论

          本文标题:Spark源码:启动TaskScheduler

          本文链接:https://www.haomeiwen.com/subject/qciggctx.html