- 原创-Spark源码分析六:Standalone模式下Drive
- Spark executor 模块③ - 启动 executor
- Spark Task 的执行流程② - 创建、分发 Task
- Spark executor 模块② - AppClient 向
- Spark executor模块① - 主要类以及创建 AppC
- 原创-Spark源码分析一:Standalone模式下Maste
- 原创-Spark源码分析二:Standalone模式下Maste
- 原创-Spark源码分析四: Standalone模式下spar
- 原创-Spark源码分析五: Standalone模式下spar
- 深入理解Spark 2.1 Core (六):资源调度的原理与源
作业提交流程图
image.png作业执行流程描述:
- 客户端提交作业给Master
- Master让一个Worker启动Driver,即SchedulerBackend。Worker创建一个DriverRunner线程,DriverRunner启动SchedulerBackend进程。
- 另外Master还会让其余Worker启动Exeuctor,即ExecutorBackend。Worker创建一个ExecutorRunner线程,ExecutorRunner会启动ExecutorBackend进程。
- ExecutorBackend启动后会向Driver的SchedulerBackend注册。SchedulerBackend进程中包含DAGScheduler,它会根据用户程序,生成执行计划,并调度执行。对于每个stage的task,都会被存放到TaskScheduler中,ExecutorBackend向SchedulerBackend汇报的时候把TaskScheduler中的task调度到ExecutorBackend执行。
- 所有stage都完成后作业结束。
driver启动流程
master接收到RequestSubmitDriver请求后会调用schedule()方法
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executors
// 对当前ALIVE状态worker进行随机洗牌(shuffle)打乱workers集合内元素,得到打乱后的worker集合
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
// 获取现在ALIVE的Worker数
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
// 遍历一个waitingDrivers集合的复制
// 将worker分配给每个等待的driver
// 对于每一个driver,从分配给driver的最后一个worker开始,然后继续向前遍历,直到探索到所有的Alive Worker
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
// 如果选出的worker空闲资源需求大于driver所需资源,则在该worker上启动driver
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
// 调用launchDriver方法
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
// 调用startExecutorsOnWorkers方法
startExecutorsOnWorkers()
}
- 判断当前Master的状态
- 对当前ALIVE状态worker进行shuffle打乱(随机洗牌)workers集合内元素,得到打乱后的worker集合
- 遍历waitingDrivers,给每个driver在打乱后的Alive Workers集合中选出一个Worker,如果选出的worker空闲资源需求大于driver所需资源,则调用launchDriver方法,在该worker上启动driver,在drivers等待队列中移除该driver,并标记该Driver为已启动
- 调用launchDriver(worker, driver)
- 最后调用startExecutorsOnWorkers
launchDriver方法
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker)
worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
driver.state = DriverState.RUNNING
}
- 在WorkerInfo中添加driver信息
- 在DriverInfo中添加worker信息
- 调用worker RpcEndPointRef向Worker发送LaunchDriver消息,告诉worker启动driver
- 将driver状态更新为RUNNING
work接收到LaunchDriver消息后
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver
driver.start()
coresUsed += driverDesc.cores
memoryUsed += driverDesc.mem
- 将driver信息封装为DriverRunner对象
- 将创建的DriverRunner对象添加到drivers HashMap中保存
- 调用DriverRunner的start方法
- 记录cores使用情况,和memory使用情况
查看Driver Runner的start方法,该方法中
/** Starts a thread to run and manage the driver. */
private[worker] def start() = {
new Thread("DriverRunner for " + driverId) {
override def run() {
var shutdownHook: AnyRef = null
try {
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
logInfo(s"Worker shutting down, killing driver $driverId")
kill()
}
// prepare driver jars and run driver
val exitCode = prepareAndRunDriver()
// set final state depending on if forcibly killed and process exit code
finalState = if (exitCode == 0) {
Some(DriverState.FINISHED)
} else if (killed) {
Some(DriverState.KILLED)
} else {
Some(DriverState.FAILED)
}
} catch {
case e: Exception =>
kill()
finalState = Some(DriverState.ERROR)
finalException = Some(e)
} finally {
if (shutdownHook != null) {
ShutdownHookManager.removeShutdownHook(shutdownHook)
}
}
// notify worker of final driver state, possible exception
worker.send(DriverStateChanged(driverId, finalState.get, finalException))
}
}.start()
}
- 调用prepareAndRunDriver方法,用于准备driver jar包,并运行driver
- 根据prepareAndRunDriver方法的退出码,设置最终状态
- 根据模式匹配,向worker发送DriverStateChanged信息,通知worker driver状态改变
查看prepareAndRunDriver方法
private[worker] def prepareAndRunDriver(): Int = {
val driverDir = createWorkingDirectory()
val localJarFilename = downloadUserJar(driverDir)
def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{USER_JAR}}" => localJarFilename
case other => other
}
// TODO: If we add ability to submit multiple jars they should also be added here
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
runDriver(builder, driverDir, driverDesc.supervise)
}
- 创建driver工作目录
- 下载用户jar包到该工作目录
- 匹配参数替代变量substituteVariables
- 创建ProcessBuilder,传入参数为启动driver的执行命令,securityManager,启动driver所需内存,spark home的绝对路径,以及替代的变量
- 调用runDriver方法
runDriver方法中,重定向将process的InputStream和ErrorStream重定向到指定的stdout、stderr文件中
这就是我们在Spark application web页面中查看Logs对应的两个日志文件
image.png
最后调用runCommandWithRetry方法
private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {
builder.directory(baseDir)
// 初始化
def initialize(process: Process): Unit = {
// Redirect stdout and stderr to files
// 创建stdout文件
val stdout = new File(baseDir, "stdout")
// 将process的InputStream流重定向到stdout文件
CommandUtils.redirectStream(process.getInputStream, stdout)
// 创建stderr文件
val stderr = new File(baseDir, "stderr")
// 将builder命令格式化处理
val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
Files.append(header, stderr, StandardCharsets.UTF_8)
// 将process的Error流信息重定向到stderr文件
CommandUtils.redirectStream(process.getErrorStream, stderr)
}
// 调用runCommandWithRetry
runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
}
该方法中,有一个初始化方法initialize,在初始化方法中做了如下步骤:
- 创建stdout、stderr文件
- 将builder命令格式化处理
- 将进程的Error流信息重定向到stderr文件
在runCommandWithRetry方法中最后调用runCommandWithRetry方法
private[worker] def runCommandWithRetry(
command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
// 默认exitCode为-1
var exitCode = -1
// Time to wait between submission retries.
// 在提交重试时等待时间
var waitSeconds = 1
// A run of this many seconds resets the exponential back-off.
val successfulRunDuration = 5
var keepTrying = !killed
while (keepTrying) {
logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))
synchronized {
// 如果被kill,返回exitCode
if (killed) { return exitCode }
// 执行命令command,启动
process = Some(command.start())
// 调用初始化方法
initialize(process.get)
}
val processStart = clock.getTimeMillis()
// 获取exitCode
exitCode = process.get.waitFor()
// check if attempting another run
// 检查是否尝试另一次运行
keepTrying = supervise && exitCode != 0 && !killed
if (keepTrying) {
if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
waitSeconds = 1
}
logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
sleeper.sleep(waitSeconds)
waitSeconds = waitSeconds * 2 // exponential back-off
}
}
// 返回exitCode
exitCode
}
}
process = Some(command.start()) 真正执行启动Driver进程, exitCode = process.get.waitFor()获取执行的返回码
移除Driver操作
DriverRunner根据退出码进行匹配,通知worker driver状态发生改变,worker接受到该消息后,处理逻辑如下:
- 向Master发送消息driverStateChanged
- 在drivers集合中移除当前driver
- 将当前driver添加到已完成driver集合中
- 移除该driver分配的内存、cpu cores资源
private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
// driver id
val driverId = driverStateChanged.driverId
val exception = driverStateChanged.exception
val state = driverStateChanged.state
// 根据state进行匹配,并打印日志
state match {
case DriverState.ERROR =>
logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
case DriverState.FAILED =>
logWarning(s"Driver $driverId exited with failure")
case DriverState.FINISHED =>
logInfo(s"Driver $driverId exited successfully")
case DriverState.KILLED =>
logInfo(s"Driver $driverId was killed by user")
case _ =>
logDebug(s"Driver $driverId changed state to $state")
}
// 向master发送消息driverStateChanged
sendToMaster(driverStateChanged)
// 在drivers集合中移除当前driver
val driver = drivers.remove(driverId).get
// 将当前driver添加到finishedDrivers
finishedDrivers(driverId) = driver
trimFinishedDriversIfNecessary()
// 移除driver内存、cpu cores资源
memoryUsed -= driver.driverDesc.mem
coresUsed -= driver.driverDesc.cores
}
Master在接受到消息后,根据匹配做出操作,如果为ERROR FINISHED KILLED FAILED状态,则调用removeDriver移除Driver
case DriverStateChanged(driverId, state, exception) =>
state match {
case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
removeDriver(driverId, state, exception)
case _ =>
throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
removeDriver方法中,做出一系列移除操作
- 在drivers集合中移除driver
- 将driver添加到completedDrivers集合中
- 持久化引擎移除driver
- 更新driver状态为最终状态时的状态
- 移除worker上的driver
- 调用schedule方法,重新调度资源
private def removeDriver(
driverId: String,
finalState: DriverState,
exception: Option[Exception]) {
// 根据该driver id进行模式匹配
drivers.find(d => d.id == driverId) match {
case Some(driver) =>
logInfo(s"Removing driver: $driverId")
// 在drivers集合中移除driver
drivers -= driver
if (completedDrivers.size >= RETAINED_DRIVERS) {
val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
completedDrivers.trimStart(toRemove)
}
// 将driver添加到completedDrivers集合中
completedDrivers += driver
// 持久化引擎移除driver
persistenceEngine.removeDriver(driver)
// 更新driver状态为finalState时
driver.state = finalState
driver.exception = exception
// 移除worker上的driver
driver.worker.foreach(w => w.removeDriver(driver))
// 调用schedule方法,做移除操作
schedule()
case None =>
logWarning(s"Asked to remove unknown driver: $driverId")
}
}
接下来分析:启动executor
网友评论