一、主备切换机制原理剖析
Spark Master主备切换可以基于两种机制,一种是基于文件系统,一种是基于Zookeeper的。基于文件系统的主备切换机制需要在Active Master挂掉之后,由我们手动去切换到Standby Master上,而基于Zookeeper的主备切换机制,可以实现自动切换Master这里要说的Master主备切换机制,实际上指的就是,在Active Master挂掉之后,切换到Standby Master时,Master会做哪些操作
主备切换机制原理剖析
- 使用持久化引擎去读取持久化的storedApps、storesDrivers、storedWorkers、
FileSystemPersistenceEngine、ZookeeperPersistenceEngine- 判断,如何storedApps、storedDrivers、storedWokers有任何一个是非空的
- 将持久化的Application、Driver、Worker的信息重新进行注册,注册到Master内部的内部缓存结构中
- 将application和Worker的状态都修改为UNKNOWN,然后向Application所对应的Driver,以及worker发送Standby Master的地址
- Master在陆续接收到Driver和worker发送来的响应消息之后,会使用completeRecovery()方法对没有发送响应消息的Driver和Worker进行处理,过滤掉它们的消息
- 调用Master自己的schedule()方法,对正在等待资源调度Driver和application进行调度,比如在某个worker上启动Driver或者为Application在Worker上启动excutor
二、注册机制原理剖析与源码分析
- worker注册
(1)worker在启动之后,就会主动向Master进行注册
(2)Master将状态为DEAD的Worker过滤掉,对于状态为UNKNOWN的Worker,清理掉旧的Worker信息,替换为新的worker信息
(3)把worker加入内存缓存中(HashMap)
(4)把持久化引擎,将worker信息进行持久化(文件系统,Zookeeper)
(5)调用schedule(方法)
- Driver注册
(1) 用spark-sumbit提交spark Application时,首先会注册Driver
(2) 将Driver信息放入内存缓存中
(3) 加入等待调度队列(ArrayBuffer)
(4) 用持久化引擎将Driver信息持久化
(5) 调用schedule(方法)
- Application
注册机制原理剖析(1)Driver启动好了,执行我们编写的Application代码,执行SparkContext初始化,底层的SparkDeploySchedulerBackend,会通过AppClinet内部的线程,
ClinetActor,发送RegisterApplication到Master进行Application的注册
(2) 将Application信息放入内存缓存中
(3) 加入等待调度队列(ArrayBuffer)
(4) 用持久化引擎将Application信息持久化
(5) 调用schedule(方法)
三、状态改变机制源码分配
- driver
- excutor
四、资源调度机制源码分析
schedule()代码片段
private def schedule() {
// 首先判断,master状态不是ALIVE的话直接返回
// 也就是说standby masters是不会进行application等资源调度
if (state != RecoveryState.ALIVE) { return }
// First schedule drivers, they take strict precedence over applications
// Randomization helps balance drivers
// Random的原理就是对传入的集合的元素进行随机打乱
// 取出workers中的所有之前注册上来的worker进行过滤,状态是ALIVE
// 对状态Alive的worker,调用random.shuffle进行打乱
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
// 首先,调度driver
// 只有在yarn-cluster模式提交时候,才会调度driver,因为standalone和yarn-client模式都会在本地直接运行
// 启动driver,而b不会来注册driver,就更不可能让master调度driver了
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
// 主要还有活着的worker没有遍历到,那就继续进行遍历
// 当前driver没有被启动,launcher为false
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
// 如何当前这个worker内存空闲量大于等于driver需要内存
// 并且worker的空闲cpu数量,大于等于driverx需要cpu数量
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
// 启动driver
launchDriver(worker, driver)
// 并且driver从waitingDrivers队列中移除
waitingDrivers -= driver
launched = true
}
// 指针指向下一个worker
curPos = (curPos + 1) % numWorkersAlive
}
}
launchDriver()方法
/**
* 在worker上,启动dirver
* @param worker
* @param driver
*/
def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
// 将driver加入worker内存的缓存结构
// 将worker内使用的内存和cpu内存数量,都加上driver需要内存和cpu数量
worker.addDriver(driver)
// 同时把worker也加入到driver内存缓存结构中
driver.worker = Some(worker)
// 调用worker的actor,给它发送LanuchDriver消息,让worker来启动driver
worker.actor ! LaunchDriver(driver.id, driver.desc)
// 状态driver的状态设置RUNNING
driver.state = DriverState.RUNNING
}
Application的调度算法有两种,一种是spreadOutApps。另一种是非spreadOutApps
(1)spreadOutApps
平均分配到每一个worker
(2)非spreadOutApps
少启动worker,每个worker能启动多少个core,就分配多少个core
源码剖析
// 将每个application要启动executor都平均分布到各个worker上去
if (spreadOutApps) {
// Try to spread out each app among all the nodes, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canUse(app, _)).sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
// 创建一个空数组,存储了要分配给每个worker的cpu数量
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
// 获取到底要分配多少cpu,取cpu剩余要分配的cpu数量和worker总共可用cpus数量最小值
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
var pos = 0
// cpu还分配完,就继续循环
while (toAssign > 0) {
// worker还有可分配cpu
if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
// 总要分配cpu-1
toAssign -= 1
assigned(pos) += 1
}
// 指针指向下一个worker
pos = (pos + 1) % numUsable
}
// Now that we've decided how many cores to give on each node, let's actually give them
// 给每个worker分配完application要求cpu core之后,遍历worker
for (pos <- 0 until numUsable) {
// 只要判断之前worker分配core
if (assigned(pos) > 0) {
// 在application内部缓存结构中,添加executor
// 并创建ExecuotorDesc对象,其中封装了这个executor分配多少个cpu
// 在spark-submit中,可以指定要多少个exectuor,每个executor多少个cpu,多少内存。
//那么基于我们机制,实际上最后executor的数量,以及 每个executor的cpu可能与配置不一样
// 比如要求3个executor,每个3个cpu,实际有9个worker,每个woker有一个cpu,要分配9个core,
// 根据这个算法会给每个worker分配9个executor,每个executor 1个core
val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
// 启动executor
launchExecutor(usableWorkers(pos), exec)
app.state = ApplicationState.RUNNING
}
}
}
} else {
// Pack each app into as few nodes as possible until we've assigned all its cores
// 非spreadOutApps调度算法
// 这个算法和spreadOutApps算法正好相反
// 每个application 最大可能少使用worker,比如总共有10个worker节点,每个有10个core
// application要分配20个core,根据这种算法只会分配到2个worker上,
// 遍历worker,并且状态为ALIVE还有空闲cpu的worker
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
// 遍历application,并且还有需要分配core的application
for (app <- waitingApps if app.coresLeft > 0) {
// 如果d当前这个worker可以被application使用
if (canUse(app, worker)) {
// 取出worker剩余cpu数量与app要分配的cpu的最小z值
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
// 给app启动一个executor
val exec = app.addExecutor(worker, coresToUse)
// 启动executor
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
}
}
}
网友评论