上文主要介绍 client 提交 Driver 的流程,这节介绍下 启动 Driver 的流程。
Master 筛选合适的 Worker,并发送 LaunchDriver 消息给 这个 Worker
上节提到,Master 中接收到 启动 Driver 的请求,在 RequestSubmitDriver 中处理:
![](https://img.haomeiwen.com/i3149801/be17d38e499b6d73.png)
看下 schedule() 代码:
private def schedule(): Unit = {
// Master的状态是否是存活状态,不是则返回
if (state != RecoveryState.ALIVE) {
return
}
//先打乱 Worker 顺序,避免 Driver 都跑到一个 Worker 中。然后找到所有存活的 Worker
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
//得到存活的 Worker 的数量
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
//遍历所有等待启动的 Driver
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
//是否启动标志位,初始值为 false
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
//如果 worker 的空闲内存 和 空闲核,满足client 要申请启动Driver 的资源
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
//则启动 Driver
launchDriver(worker, driver)
//把这个 driver 从 waitingDrivers 中移除
waitingDrivers -= driver
//是否启动标志位 置为 true,退出循环
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
//启动一个 Executor
startExecutorsOnWorkers()
}
如上面中的注释,从所有存活的 worker 中,找到满足资源条件的 worker ,并启动Driver,查看launchDriver :
如上图,Master通过RPC通信,把启动Driver的消息发送给 满足条件的Worker节点。
Worker节点启动Driver
Worker的receive方法接收并处理LaunchDriver信息,如下 :
case LaunchDriver(driverId, driverDesc) =>
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner(...)
drivers(driverId) = driver
driver.start()
如上图,首先将Driver信息封装为DriverRunner,然后调用其start方法启动Driver。查看DriverRunner的start方法(源码较多,做了简化处理)
执行步骤如上图注释,最终使用Java中的java.lang.ProcessBuilder类执行Linux命令的方式启动Driver,Linux命令大致如下
通过java -cp命令在Worker节点开始运行了,即Launch Driver,所谓的Driver就是/path/to/examples.jar。
最后,将Driver的执行状态返回给Master。
Launch Exector
在 上面 Master schedule 方法中,Master 筛选合适的 Worker,并在最后:startExecutorsOnWorkers
,和 Launch Driver 类似,先筛选 Worker,然后在 Worker 上运行 Exector
![](https://img.haomeiwen.com/i3149801/b320f9d413e13927.png)
总结
介绍了Master将Driver发送到Worker,及在Worker节点启动Driver的流程,如下
网友评论