美文网首页
原创-Spark源码分析六:Standalone模式下Drive

原创-Spark源码分析六:Standalone模式下Drive

作者: 无色的叶 | 来源:发表于2019-08-05 15:25 被阅读0次

    作业提交流程图

    image.png

    作业执行流程描述:

    • 客户端提交作业给Master
    • Master让一个Worker启动Driver,即SchedulerBackend。Worker创建一个DriverRunner线程,DriverRunner启动SchedulerBackend进程。
    • 另外Master还会让其余Worker启动Exeuctor,即ExecutorBackend。Worker创建一个ExecutorRunner线程,ExecutorRunner会启动ExecutorBackend进程。
    • ExecutorBackend启动后会向Driver的SchedulerBackend注册。SchedulerBackend进程中包含DAGScheduler,它会根据用户程序,生成执行计划,并调度执行。对于每个stage的task,都会被存放到TaskScheduler中,ExecutorBackend向SchedulerBackend汇报的时候把TaskScheduler中的task调度到ExecutorBackend执行。
    • 所有stage都完成后作业结束。

    driver启动流程

    master接收到RequestSubmitDriver请求后会调用schedule()方法

    /**
       * Schedule the currently available resources among waiting apps. This method will be called
       * every time a new app joins or resource availability changes.
       */
      private def schedule(): Unit = {
        if (state != RecoveryState.ALIVE) {
          return
        }
        // Drivers take strict precedence over executors
        // 对当前ALIVE状态worker进行随机洗牌(shuffle)打乱workers集合内元素,得到打乱后的worker集合
        val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
        //  获取现在ALIVE的Worker数
        val numWorkersAlive = shuffledAliveWorkers.size
        var curPos = 0
        for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
          // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
          // start from the last worker that was assigned a driver, and continue onwards until we have
          // explored all alive workers.
          //  遍历一个waitingDrivers集合的复制
          //  将worker分配给每个等待的driver
          //  对于每一个driver,从分配给driver的最后一个worker开始,然后继续向前遍历,直到探索到所有的Alive Worker
          var launched = false
          var numWorkersVisited = 0
          while (numWorkersVisited < numWorkersAlive && !launched) {
            val worker = shuffledAliveWorkers(curPos)
            numWorkersVisited += 1
            //  如果选出的worker空闲资源需求大于driver所需资源,则在该worker上启动driver
            if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
              // 调用launchDriver方法
              launchDriver(worker, driver)
              waitingDrivers -= driver
              launched = true
            }
            curPos = (curPos + 1) % numWorkersAlive
          }
        }
        //  调用startExecutorsOnWorkers方法
        startExecutorsOnWorkers()
      }
    
    • 判断当前Master的状态
    • 对当前ALIVE状态worker进行shuffle打乱(随机洗牌)workers集合内元素,得到打乱后的worker集合
    • 遍历waitingDrivers,给每个driver在打乱后的Alive Workers集合中选出一个Worker,如果选出的worker空闲资源需求大于driver所需资源,则调用launchDriver方法,在该worker上启动driver,在drivers等待队列中移除该driver,并标记该Driver为已启动
    • 调用launchDriver(worker, driver)
    • 最后调用startExecutorsOnWorkers

    launchDriver方法

    private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
        logInfo("Launching driver " + driver.id + " on worker " + worker.id)
        worker.addDriver(driver)
        driver.worker = Some(worker)
        worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
        driver.state = DriverState.RUNNING
      }
    
    • 在WorkerInfo中添加driver信息
    • 在DriverInfo中添加worker信息
    • 调用worker RpcEndPointRef向Worker发送LaunchDriver消息,告诉worker启动driver
    • 将driver状态更新为RUNNING

    work接收到LaunchDriver消息后

    case LaunchDriver(driverId, driverDesc) =>
          logInfo(s"Asked to launch driver $driverId")
          val driver = new DriverRunner(
            conf,
            driverId,
            workDir,
            sparkHome,
            driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
            self,
            workerUri,
            securityMgr)
          drivers(driverId) = driver
          driver.start()
    
          coresUsed += driverDesc.cores
          memoryUsed += driverDesc.mem
    
    • 将driver信息封装为DriverRunner对象
    • 将创建的DriverRunner对象添加到drivers HashMap中保存
    • 调用DriverRunner的start方法
    • 记录cores使用情况,和memory使用情况

    查看Driver Runner的start方法,该方法中

     /** Starts a thread to run and manage the driver. */
      private[worker] def start() = {
        new Thread("DriverRunner for " + driverId) {
          override def run() {
            var shutdownHook: AnyRef = null
            try {
              shutdownHook = ShutdownHookManager.addShutdownHook { () =>
                logInfo(s"Worker shutting down, killing driver $driverId")
                kill()
              }
    
              // prepare driver jars and run driver
              val exitCode = prepareAndRunDriver()
    
              // set final state depending on if forcibly killed and process exit code
              finalState = if (exitCode == 0) {
                Some(DriverState.FINISHED)
              } else if (killed) {
                Some(DriverState.KILLED)
              } else {
                Some(DriverState.FAILED)
              }
            } catch {
              case e: Exception =>
                kill()
                finalState = Some(DriverState.ERROR)
                finalException = Some(e)
            } finally {
              if (shutdownHook != null) {
                ShutdownHookManager.removeShutdownHook(shutdownHook)
              }
            }
    
            // notify worker of final driver state, possible exception
            worker.send(DriverStateChanged(driverId, finalState.get, finalException))
          }
        }.start()
      }
    
    • 调用prepareAndRunDriver方法,用于准备driver jar包,并运行driver
    • 根据prepareAndRunDriver方法的退出码,设置最终状态
    • 根据模式匹配,向worker发送DriverStateChanged信息,通知worker driver状态改变

    查看prepareAndRunDriver方法

     private[worker] def prepareAndRunDriver(): Int = {
        val driverDir = createWorkingDirectory()
        val localJarFilename = downloadUserJar(driverDir)
    
        def substituteVariables(argument: String): String = argument match {
          case "{{WORKER_URL}}" => workerUrl
          case "{{USER_JAR}}" => localJarFilename
          case other => other
        }
    
        // TODO: If we add ability to submit multiple jars they should also be added here
        val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
          driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
    
        runDriver(builder, driverDir, driverDesc.supervise)
      }
    
    • 创建driver工作目录
    • 下载用户jar包到该工作目录
    • 匹配参数替代变量substituteVariables
    • 创建ProcessBuilder,传入参数为启动driver的执行命令,securityManager,启动driver所需内存,spark home的绝对路径,以及替代的变量
    • 调用runDriver方法

    runDriver方法中,重定向将process的InputStream和ErrorStream重定向到指定的stdout、stderr文件中
    这就是我们在Spark application web页面中查看Logs对应的两个日志文件


    image.png

    最后调用runCommandWithRetry方法

    private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {
    
        builder.directory(baseDir)
        //  初始化
        def initialize(process: Process): Unit = {
          // Redirect stdout and stderr to files
          //  创建stdout文件
          val stdout = new File(baseDir, "stdout")
          //  将process的InputStream流重定向到stdout文件
          CommandUtils.redirectStream(process.getInputStream, stdout)
          //  创建stderr文件
          val stderr = new File(baseDir, "stderr")
          //  将builder命令格式化处理
          val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
          val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
          Files.append(header, stderr, StandardCharsets.UTF_8)
          //  将process的Error流信息重定向到stderr文件
          CommandUtils.redirectStream(process.getErrorStream, stderr)
        }
        //  调用runCommandWithRetry
        runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
      }
    

    该方法中,有一个初始化方法initialize,在初始化方法中做了如下步骤:

    • 创建stdout、stderr文件
    • 将builder命令格式化处理
    • 将进程的Error流信息重定向到stderr文件

    在runCommandWithRetry方法中最后调用runCommandWithRetry方法

    private[worker] def runCommandWithRetry(
          command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
        //  默认exitCode为-1
        var exitCode = -1
        // Time to wait between submission retries.
        // 在提交重试时等待时间
        var waitSeconds = 1
        // A run of this many seconds resets the exponential back-off.
        val successfulRunDuration = 5
        var keepTrying = !killed
    
        while (keepTrying) {
          logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))
    
          synchronized {
            //  如果被kill,返回exitCode
            if (killed) { return exitCode }
            //  执行命令command,启动
            process = Some(command.start())
            //  调用初始化方法
            initialize(process.get)
          }
    
          val processStart = clock.getTimeMillis()
    
          //  获取exitCode
          exitCode = process.get.waitFor()
    
          // check if attempting another run
    
          //  检查是否尝试另一次运行
          keepTrying = supervise && exitCode != 0 && !killed
          if (keepTrying) {
            if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
              waitSeconds = 1
            }
            logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
            sleeper.sleep(waitSeconds)
            waitSeconds = waitSeconds * 2 // exponential back-off
          }
        }
        //  返回exitCode
        exitCode
      }
    }
    

    process = Some(command.start()) 真正执行启动Driver进程, exitCode = process.get.waitFor()获取执行的返回码

    移除Driver操作

    DriverRunner根据退出码进行匹配,通知worker driver状态发生改变,worker接受到该消息后,处理逻辑如下:

    • 向Master发送消息driverStateChanged
    • 在drivers集合中移除当前driver
    • 将当前driver添加到已完成driver集合中
    • 移除该driver分配的内存、cpu cores资源
     private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
        // driver id
        val driverId = driverStateChanged.driverId
        val exception = driverStateChanged.exception
        val state = driverStateChanged.state
        //  根据state进行匹配,并打印日志
        state match {
          case DriverState.ERROR =>
            logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
          case DriverState.FAILED =>
            logWarning(s"Driver $driverId exited with failure")
          case DriverState.FINISHED =>
            logInfo(s"Driver $driverId exited successfully")
          case DriverState.KILLED =>
            logInfo(s"Driver $driverId was killed by user")
          case _ =>
            logDebug(s"Driver $driverId changed state to $state")
        }
        //  向master发送消息driverStateChanged
        sendToMaster(driverStateChanged)
        //  在drivers集合中移除当前driver
        val driver = drivers.remove(driverId).get
        //  将当前driver添加到finishedDrivers
        finishedDrivers(driverId) = driver
        trimFinishedDriversIfNecessary()
        //  移除driver内存、cpu cores资源
        memoryUsed -= driver.driverDesc.mem
        coresUsed -= driver.driverDesc.cores
      }
    

    Master在接受到消息后,根据匹配做出操作,如果为ERROR FINISHED KILLED FAILED状态,则调用removeDriver移除Driver

    case DriverStateChanged(driverId, state, exception) =>
          state match {
            case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
              removeDriver(driverId, state, exception)
            case _ =>
              throw new Exception(s"Received unexpected state update for driver $driverId: $state")
          }
    

    removeDriver方法中,做出一系列移除操作

    • 在drivers集合中移除driver
    • 将driver添加到completedDrivers集合中
    • 持久化引擎移除driver
    • 更新driver状态为最终状态时的状态
    • 移除worker上的driver
    • 调用schedule方法,重新调度资源
      private def removeDriver(
          driverId: String,
          finalState: DriverState,
          exception: Option[Exception]) {
        //  根据该driver id进行模式匹配
        drivers.find(d => d.id == driverId) match {
          case Some(driver) =>
            logInfo(s"Removing driver: $driverId")
            //  在drivers集合中移除driver
            drivers -= driver
            if (completedDrivers.size >= RETAINED_DRIVERS) {
              val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
              completedDrivers.trimStart(toRemove)
            }
            // 将driver添加到completedDrivers集合中
            completedDrivers += driver
            //  持久化引擎移除driver
            persistenceEngine.removeDriver(driver)
            //  更新driver状态为finalState时
            driver.state = finalState
            driver.exception = exception
            //  移除worker上的driver
            driver.worker.foreach(w => w.removeDriver(driver))
            //  调用schedule方法,做移除操作
            schedule()
          case None =>
            logWarning(s"Asked to remove unknown driver: $driverId")
        }
      }
    

    接下来分析:启动executor

    参考:http://www.louisvv.com/archives/1366.html

    相关文章

      网友评论

          本文标题:原创-Spark源码分析六:Standalone模式下Drive

          本文链接:https://www.haomeiwen.com/subject/rolmdctx.html