如果系统配置了动态分配管理则会用ExecutorAllocationManager来管理集群的Executors。
看一下它的start方法
def start(): Unit = {
listenerBus.addToManagementQueue(listener)//内部类加入事件总线的监听
val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
//定时执行上面Runnable的任务
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
//请求executors
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
}
可以看到主要是监听事件总线和定时任务以及请求executors
监听类ExecutorAllocationListener
在收到事件总线发送的消息时做出一定的处理监听的事件如下:
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {
initializing = false
val stageId = stageSubmitted.stageInfo.stageId//获取stageId
val numTasks = stageSubmitted.stageInfo.numTasks//获取任务数 allocationManager.synchronized {
stageIdToNumTasks(stageId) = numTasks//存起来
stageIdToNumRunningTask(stageId) = 0//初始化为0
allocationManager.onSchedulerBacklogged()
// Compute the number of tasks requested by the stage on each host
var numTasksPending = 0
val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]()
//计算数据本地化
stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>
if (!locality.isEmpty) {
numTasksPending += 1
locality.foreach { location =>
val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1
hostToLocalTaskCountPerStage(location.host) = count
}
}
}
stageIdToExecutorPlacementHints.put(stageId,
(numTasksPending, hostToLocalTaskCountPerStage.toMap))
// Update the executor placement hints
updateExecutorPlacementHints()
}
}
主要是根据该Stage的Task的本地化偏好设置每台机器起几个节点,然后更新本地化任务数和hostToLocalTaskCount
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val stageId = stageCompleted.stageInfo.stageId
allocationManager.synchronized {
stageIdToNumTasks -= stageId//从缓存中删除
stageIdToNumRunningTask -= stageId//从缓存中删除
stageIdToNumSpeculativeTasks -= stageId//从缓存中删除
stageIdToTaskIndices -= stageId//从缓存中删除
stageIdToSpeculativeTaskIndices -= stageId//从缓存中删除
stageIdToExecutorPlacementHints -= stageId//从缓存中删除
// Update the executor placement hints
updateExecutorPlacementHints()
// If this is the last stage with pending tasks, mark the scheduler queue as empty
// This is needed in case the stage is aborted for any reason
if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
allocationManager.onSchedulerQueueEmpty()
}
}
}
和前面的正好相反
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
val stageId = taskStart.stageId//取出值
val taskId = taskStart.taskInfo.taskId//取出值
val taskIndex = taskStart.taskInfo.index//取出值
val executorId = taskStart.taskInfo.executorId//取出值
allocationManager.synchronized {
if (stageIdToNumRunningTask.contains(stageId)) {
stageIdToNumRunningTask(stageId) += 1//该stageId对应的运行Task数+1
}
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
// possible because these events are posted in different threads. (see SPARK-4951)
if (!allocationManager.executorIds.contains(executorId)) {
allocationManager.onExecutorAdded(executorId)
}
// If this is the last pending task, mark the scheduler queue as empty
if (taskStart.taskInfo.speculative) {
stageIdToSpeculativeTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) +=
taskIndex
} else {
stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex
}
if (totalPendingTasks() == 0) {
allocationManager.onSchedulerQueueEmpty()
}
// Mark the executor on which this task is scheduled as busy
executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId
allocationManager.onExecutorBusy(executorId)
}
}
同理还有onTaskStart,onTaskEnd,onExecutorAdded,onExecutorRemoved,onSpeculativeTaskSubmitted事件。
onSpeculativeTaskSubmitted是关于系统推测的,如果整个任务集中的一个的执行明显落后于其他,则另起一个executor执行,如果这个执行成功则把原来的Kill掉。
定时任务
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
updateAndSyncNumExecutorsTarget(now)
val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
if (expired) {
initializing = false
executorIdsToBeRemoved += executorId
}
!expired
}
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
}
updateAndSyncNumExecutorsTarget根据当前需要的executor数量动态请求增加或者减少executors,
剩下的是检查removeTimes队列中的executors是否过期 如果过期则kill掉,这个数据结构后面写,还有很多没写。
类的成员变量
minNumExecutors配置的最少executors数
maxNumExecutors配置的最大的executors数
initialNumExecutors配置的最大的executors数,初始化时启用的executor的个数
schedulerBacklogTimeoutS 如果未分配的task等待分配的时间超过了这个配置的时间,表示需要新启动executor.
sustainedSchedulerBacklogTimeoutS设置在初始调度的executor调度延时后,每次的等待超时时间.
executorIdleTimeoutS 没有cache RDD的executors空闲超过这个时间则remove
cachedExecutorIdleTimeoutS有cacheRDD的executors的空闲回收时间 默认是Integer.MAX_VALUE不回收
private val tasksPerExecutor =
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
// Number of executors to add in the next round
private var numExecutorsToAdd = 1
numExecutorsTarget = initialNumExecutors//初始化numExecutorsTarget
executorsPendingToRemove = new mutable.HashSet[String]//将要被remove的executors
executorIds = new mutable.HashSet[String] //所有的已知的executors
removeTimes = new mutable.HashMap[String, Long]//各空闲executors的过期时间
hostToLocalTaskCount: Map[String, Int] = Map.empty //主机有几个任务
client : ExecutorAllocationClient, 跟executor交互的客户端,增加或者删除executors
这个类用一个监听类监听Spark的事件总线任务提交结束等事件更新自己维护的参数,然后由start方法启动的定时任务增加和删除executors。
网友评论