美文网首页
原创-Spark源码分析六:Standalone模式下Drive

原创-Spark源码分析六:Standalone模式下Drive

作者: 无色的叶 | 来源:发表于2019-08-05 15:25 被阅读0次

作业提交流程图

image.png

作业执行流程描述:

  • 客户端提交作业给Master
  • Master让一个Worker启动Driver,即SchedulerBackend。Worker创建一个DriverRunner线程,DriverRunner启动SchedulerBackend进程。
  • 另外Master还会让其余Worker启动Exeuctor,即ExecutorBackend。Worker创建一个ExecutorRunner线程,ExecutorRunner会启动ExecutorBackend进程。
  • ExecutorBackend启动后会向Driver的SchedulerBackend注册。SchedulerBackend进程中包含DAGScheduler,它会根据用户程序,生成执行计划,并调度执行。对于每个stage的task,都会被存放到TaskScheduler中,ExecutorBackend向SchedulerBackend汇报的时候把TaskScheduler中的task调度到ExecutorBackend执行。
  • 所有stage都完成后作业结束。

driver启动流程

master接收到RequestSubmitDriver请求后会调用schedule()方法

/**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) {
      return
    }
    // Drivers take strict precedence over executors
    // 对当前ALIVE状态worker进行随机洗牌(shuffle)打乱workers集合内元素,得到打乱后的worker集合
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    //  获取现在ALIVE的Worker数
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      //  遍历一个waitingDrivers集合的复制
      //  将worker分配给每个等待的driver
      //  对于每一个driver,从分配给driver的最后一个worker开始,然后继续向前遍历,直到探索到所有的Alive Worker
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        //  如果选出的worker空闲资源需求大于driver所需资源,则在该worker上启动driver
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          // 调用launchDriver方法
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    //  调用startExecutorsOnWorkers方法
    startExecutorsOnWorkers()
  }
  • 判断当前Master的状态
  • 对当前ALIVE状态worker进行shuffle打乱(随机洗牌)workers集合内元素,得到打乱后的worker集合
  • 遍历waitingDrivers,给每个driver在打乱后的Alive Workers集合中选出一个Worker,如果选出的worker空闲资源需求大于driver所需资源,则调用launchDriver方法,在该worker上启动driver,在drivers等待队列中移除该driver,并标记该Driver为已启动
  • 调用launchDriver(worker, driver)
  • 最后调用startExecutorsOnWorkers

launchDriver方法

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    worker.addDriver(driver)
    driver.worker = Some(worker)
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    driver.state = DriverState.RUNNING
  }
  • 在WorkerInfo中添加driver信息
  • 在DriverInfo中添加worker信息
  • 调用worker RpcEndPointRef向Worker发送LaunchDriver消息,告诉worker启动driver
  • 将driver状态更新为RUNNING

work接收到LaunchDriver消息后

case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
      drivers(driverId) = driver
      driver.start()

      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
  • 将driver信息封装为DriverRunner对象
  • 将创建的DriverRunner对象添加到drivers HashMap中保存
  • 调用DriverRunner的start方法
  • 记录cores使用情况,和memory使用情况

查看Driver Runner的start方法,该方法中

 /** Starts a thread to run and manage the driver. */
  private[worker] def start() = {
    new Thread("DriverRunner for " + driverId) {
      override def run() {
        var shutdownHook: AnyRef = null
        try {
          shutdownHook = ShutdownHookManager.addShutdownHook { () =>
            logInfo(s"Worker shutting down, killing driver $driverId")
            kill()
          }

          // prepare driver jars and run driver
          val exitCode = prepareAndRunDriver()

          // set final state depending on if forcibly killed and process exit code
          finalState = if (exitCode == 0) {
            Some(DriverState.FINISHED)
          } else if (killed) {
            Some(DriverState.KILLED)
          } else {
            Some(DriverState.FAILED)
          }
        } catch {
          case e: Exception =>
            kill()
            finalState = Some(DriverState.ERROR)
            finalException = Some(e)
        } finally {
          if (shutdownHook != null) {
            ShutdownHookManager.removeShutdownHook(shutdownHook)
          }
        }

        // notify worker of final driver state, possible exception
        worker.send(DriverStateChanged(driverId, finalState.get, finalException))
      }
    }.start()
  }
  • 调用prepareAndRunDriver方法,用于准备driver jar包,并运行driver
  • 根据prepareAndRunDriver方法的退出码,设置最终状态
  • 根据模式匹配,向worker发送DriverStateChanged信息,通知worker driver状态改变

查看prepareAndRunDriver方法

 private[worker] def prepareAndRunDriver(): Int = {
    val driverDir = createWorkingDirectory()
    val localJarFilename = downloadUserJar(driverDir)

    def substituteVariables(argument: String): String = argument match {
      case "{{WORKER_URL}}" => workerUrl
      case "{{USER_JAR}}" => localJarFilename
      case other => other
    }

    // TODO: If we add ability to submit multiple jars they should also be added here
    val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
      driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

    runDriver(builder, driverDir, driverDesc.supervise)
  }
  • 创建driver工作目录
  • 下载用户jar包到该工作目录
  • 匹配参数替代变量substituteVariables
  • 创建ProcessBuilder,传入参数为启动driver的执行命令,securityManager,启动driver所需内存,spark home的绝对路径,以及替代的变量
  • 调用runDriver方法

runDriver方法中,重定向将process的InputStream和ErrorStream重定向到指定的stdout、stderr文件中
这就是我们在Spark application web页面中查看Logs对应的两个日志文件


image.png

最后调用runCommandWithRetry方法

private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {

    builder.directory(baseDir)
    //  初始化
    def initialize(process: Process): Unit = {
      // Redirect stdout and stderr to files
      //  创建stdout文件
      val stdout = new File(baseDir, "stdout")
      //  将process的InputStream流重定向到stdout文件
      CommandUtils.redirectStream(process.getInputStream, stdout)
      //  创建stderr文件
      val stderr = new File(baseDir, "stderr")
      //  将builder命令格式化处理
      val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
      val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
      Files.append(header, stderr, StandardCharsets.UTF_8)
      //  将process的Error流信息重定向到stderr文件
      CommandUtils.redirectStream(process.getErrorStream, stderr)
    }
    //  调用runCommandWithRetry
    runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
  }

该方法中,有一个初始化方法initialize,在初始化方法中做了如下步骤:

  • 创建stdout、stderr文件
  • 将builder命令格式化处理
  • 将进程的Error流信息重定向到stderr文件

在runCommandWithRetry方法中最后调用runCommandWithRetry方法

private[worker] def runCommandWithRetry(
      command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
    //  默认exitCode为-1
    var exitCode = -1
    // Time to wait between submission retries.
    // 在提交重试时等待时间
    var waitSeconds = 1
    // A run of this many seconds resets the exponential back-off.
    val successfulRunDuration = 5
    var keepTrying = !killed

    while (keepTrying) {
      logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))

      synchronized {
        //  如果被kill,返回exitCode
        if (killed) { return exitCode }
        //  执行命令command,启动
        process = Some(command.start())
        //  调用初始化方法
        initialize(process.get)
      }

      val processStart = clock.getTimeMillis()

      //  获取exitCode
      exitCode = process.get.waitFor()

      // check if attempting another run

      //  检查是否尝试另一次运行
      keepTrying = supervise && exitCode != 0 && !killed
      if (keepTrying) {
        if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
          waitSeconds = 1
        }
        logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
        sleeper.sleep(waitSeconds)
        waitSeconds = waitSeconds * 2 // exponential back-off
      }
    }
    //  返回exitCode
    exitCode
  }
}

process = Some(command.start()) 真正执行启动Driver进程, exitCode = process.get.waitFor()获取执行的返回码

移除Driver操作

DriverRunner根据退出码进行匹配,通知worker driver状态发生改变,worker接受到该消息后,处理逻辑如下:

  • 向Master发送消息driverStateChanged
  • 在drivers集合中移除当前driver
  • 将当前driver添加到已完成driver集合中
  • 移除该driver分配的内存、cpu cores资源
 private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
    // driver id
    val driverId = driverStateChanged.driverId
    val exception = driverStateChanged.exception
    val state = driverStateChanged.state
    //  根据state进行匹配,并打印日志
    state match {
      case DriverState.ERROR =>
        logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
      case DriverState.FAILED =>
        logWarning(s"Driver $driverId exited with failure")
      case DriverState.FINISHED =>
        logInfo(s"Driver $driverId exited successfully")
      case DriverState.KILLED =>
        logInfo(s"Driver $driverId was killed by user")
      case _ =>
        logDebug(s"Driver $driverId changed state to $state")
    }
    //  向master发送消息driverStateChanged
    sendToMaster(driverStateChanged)
    //  在drivers集合中移除当前driver
    val driver = drivers.remove(driverId).get
    //  将当前driver添加到finishedDrivers
    finishedDrivers(driverId) = driver
    trimFinishedDriversIfNecessary()
    //  移除driver内存、cpu cores资源
    memoryUsed -= driver.driverDesc.mem
    coresUsed -= driver.driverDesc.cores
  }

Master在接受到消息后,根据匹配做出操作,如果为ERROR FINISHED KILLED FAILED状态,则调用removeDriver移除Driver

case DriverStateChanged(driverId, state, exception) =>
      state match {
        case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
          removeDriver(driverId, state, exception)
        case _ =>
          throw new Exception(s"Received unexpected state update for driver $driverId: $state")
      }

removeDriver方法中,做出一系列移除操作

  • 在drivers集合中移除driver
  • 将driver添加到completedDrivers集合中
  • 持久化引擎移除driver
  • 更新driver状态为最终状态时的状态
  • 移除worker上的driver
  • 调用schedule方法,重新调度资源
  private def removeDriver(
      driverId: String,
      finalState: DriverState,
      exception: Option[Exception]) {
    //  根据该driver id进行模式匹配
    drivers.find(d => d.id == driverId) match {
      case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        //  在drivers集合中移除driver
        drivers -= driver
        if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
        // 将driver添加到completedDrivers集合中
        completedDrivers += driver
        //  持久化引擎移除driver
        persistenceEngine.removeDriver(driver)
        //  更新driver状态为finalState时
        driver.state = finalState
        driver.exception = exception
        //  移除worker上的driver
        driver.worker.foreach(w => w.removeDriver(driver))
        //  调用schedule方法,做移除操作
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }

接下来分析:启动executor

参考:http://www.louisvv.com/archives/1366.html

相关文章

网友评论

      本文标题:原创-Spark源码分析六:Standalone模式下Drive

      本文链接:https://www.haomeiwen.com/subject/rolmdctx.html