美文网首页
ExecutorAllocationManager

ExecutorAllocationManager

作者: 是我_7b3f | 来源:发表于2018-05-20 10:04 被阅读0次

如果系统配置了动态分配管理则会用ExecutorAllocationManager来管理集群的Executors。

看一下它的start方法

def start(): Unit = {

    listenerBus.addToManagementQueue(listener)//内部类加入事件总线的监听

    val scheduleTask = new Runnable() {

      override def run(): Unit = {

        try {

          schedule()

        } catch {

          case ct: ControlThrowable =>

            throw ct

          case t: Throwable =>

            logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)

        }

      }

    }

//定时执行上面Runnable的任务

    executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)

//请求executors

    client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)

  }

可以看到主要是监听事件总线和定时任务以及请求executors

监听类ExecutorAllocationListener

在收到事件总线发送的消息时做出一定的处理监听的事件如下:

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = {

      initializing = false

      val stageId = stageSubmitted.stageInfo.stageId//获取stageId

      val numTasks = stageSubmitted.stageInfo.numTasks//获取任务数      allocationManager.synchronized {

        stageIdToNumTasks(stageId) = numTasks//存起来

        stageIdToNumRunningTask(stageId) = 0//初始化为0

        allocationManager.onSchedulerBacklogged()

        // Compute the number of tasks requested by the stage on each host

        var numTasksPending = 0

        val hostToLocalTaskCountPerStage = new mutable.HashMap[String, Int]()

//计算数据本地化

        stageSubmitted.stageInfo.taskLocalityPreferences.foreach { locality =>

          if (!locality.isEmpty) {

            numTasksPending += 1

            locality.foreach { location =>

              val count = hostToLocalTaskCountPerStage.getOrElse(location.host, 0) + 1

              hostToLocalTaskCountPerStage(location.host) = count

            }

          }

        }

        stageIdToExecutorPlacementHints.put(stageId,

          (numTasksPending, hostToLocalTaskCountPerStage.toMap))

        // Update the executor placement hints

        updateExecutorPlacementHints()

      }

    }

主要是根据该Stage的Task的本地化偏好设置每台机器起几个节点,然后更新本地化任务数和hostToLocalTaskCount

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {

      val stageId = stageCompleted.stageInfo.stageId

      allocationManager.synchronized {

        stageIdToNumTasks -= stageId//从缓存中删除

        stageIdToNumRunningTask -= stageId//从缓存中删除

        stageIdToNumSpeculativeTasks -= stageId//从缓存中删除

        stageIdToTaskIndices -= stageId//从缓存中删除

        stageIdToSpeculativeTaskIndices -= stageId//从缓存中删除

        stageIdToExecutorPlacementHints -= stageId//从缓存中删除

        // Update the executor placement hints

        updateExecutorPlacementHints()

        // If this is the last stage with pending tasks, mark the scheduler queue as empty

        // This is needed in case the stage is aborted for any reason

        if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {

          allocationManager.onSchedulerQueueEmpty()

        }

      }

    }

和前面的正好相反

override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {

      val stageId = taskStart.stageId//取出值

      val taskId = taskStart.taskInfo.taskId//取出值

      val taskIndex = taskStart.taskInfo.index//取出值

      val executorId = taskStart.taskInfo.executorId//取出值

      allocationManager.synchronized {

        if (stageIdToNumRunningTask.contains(stageId)) {

          stageIdToNumRunningTask(stageId) += 1//该stageId对应的运行Task数+1

        }

        // This guards against the race condition in which the `SparkListenerTaskStart`

        // event is posted before the `SparkListenerBlockManagerAdded` event, which is

        // possible because these events are posted in different threads. (see SPARK-4951)

        if (!allocationManager.executorIds.contains(executorId)) {

          allocationManager.onExecutorAdded(executorId)

        }

        // If this is the last pending task, mark the scheduler queue as empty

        if (taskStart.taskInfo.speculative) {

          stageIdToSpeculativeTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) +=

            taskIndex

        } else {

          stageIdToTaskIndices.getOrElseUpdate(stageId, new mutable.HashSet[Int]) += taskIndex

        }

        if (totalPendingTasks() == 0) {

          allocationManager.onSchedulerQueueEmpty()

        }

        // Mark the executor on which this task is scheduled as busy

        executorIdToTaskIds.getOrElseUpdate(executorId, new mutable.HashSet[Long]) += taskId

        allocationManager.onExecutorBusy(executorId)

      }

    }

同理还有onTaskStart,onTaskEnd,onExecutorAdded,onExecutorRemoved,onSpeculativeTaskSubmitted事件。

onSpeculativeTaskSubmitted是关于系统推测的,如果整个任务集中的一个的执行明显落后于其他,则另起一个executor执行,如果这个执行成功则把原来的Kill掉。

定时任务

private def schedule(): Unit = synchronized {

    val now = clock.getTimeMillis

    updateAndSyncNumExecutorsTarget(now)

    val executorIdsToBeRemoved = ArrayBuffer[String]()

    removeTimes.retain { case (executorId, expireTime) =>

      val expired = now >= expireTime

      if (expired) {

        initializing = false

        executorIdsToBeRemoved += executorId

      }

      !expired

    }

    if (executorIdsToBeRemoved.nonEmpty) {

      removeExecutors(executorIdsToBeRemoved)

    }

  }

updateAndSyncNumExecutorsTarget根据当前需要的executor数量动态请求增加或者减少executors,

剩下的是检查removeTimes队列中的executors是否过期 如果过期则kill掉,这个数据结构后面写,还有很多没写。

类的成员变量

minNumExecutors配置的最少executors数

maxNumExecutors配置的最大的executors数

initialNumExecutors配置的最大的executors数,初始化时启用的executor的个数

schedulerBacklogTimeoutS 如果未分配的task等待分配的时间超过了这个配置的时间,表示需要新启动executor.

sustainedSchedulerBacklogTimeoutS设置在初始调度的executor调度延时后,每次的等待超时时间.

executorIdleTimeoutS 没有cache RDD的executors空闲超过这个时间则remove

cachedExecutorIdleTimeoutS有cacheRDD的executors的空闲回收时间 默认是Integer.MAX_VALUE不回收

private val tasksPerExecutor =

    conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)

// Number of executors to add in the next round

  private var numExecutorsToAdd = 1

numExecutorsTarget = initialNumExecutors//初始化numExecutorsTarget

executorsPendingToRemove = new mutable.HashSet[String]//将要被remove的executors

executorIds = new mutable.HashSet[String] //所有的已知的executors

removeTimes = new mutable.HashMap[String, Long]//各空闲executors的过期时间

hostToLocalTaskCount: Map[String, Int] = Map.empty //主机有几个任务

client : ExecutorAllocationClient, 跟executor交互的客户端,增加或者删除executors

这个类用一个监听类监听Spark的事件总线任务提交结束等事件更新自己维护的参数,然后由start方法启动的定时任务增加和删除executors。

相关文章

网友评论

      本文标题:ExecutorAllocationManager

      本文链接:https://www.haomeiwen.com/subject/azgadftx.html