美文网首页spark
Spark不同运行模式下资源分配源码解读

Spark不同运行模式下资源分配源码解读

作者: lehi | 来源:发表于2016-04-23 09:31 被阅读206次

    版权声明:本文为原创文章,未经允许不得转载。
    复习内容:
    Spark中Task的提交源码解读 http://www.jianshu.com/p/9e75c11a5081

    SchedulerBackend是一个trait,它配合TaskSchedulerImpl共同完成Task调度、执行、资源的分配等。它的子类如下所示,不同的子类对应的不同Spark不同的资源分配调度。详见图1。


    图1 SchedulerBackend子类继承图.png

    Spark中不同(集群)模式进行资源的分配是通过调用backend.reviveOffers()方法来给Task分配资源的,其调度子类与其负责的运行模式如下所示:
    LocalBackend
    (1)本地单线程运行模式,master形如local
    (2)本地多线程运行模式,匹配local[N]和Local[],
    (3)匹配local[
    , M]和local[N, M]
    SparkDeploySchedulerBackend
    (4)匹配Spark Standalone运行模式
    (5)匹配local-cluster运行模式即伪分布模式
    YarnClusterSchedulerBackend
    (6)"yarn-standalone"或"yarn-cluster"运行模式,
    (7)yarn-client运行模式
    CoarseMesosSchedulerBackend(粗粒度)和MesosSchedulerBackend(细粒度)
    (8)匹配Mesos运行模式,mesos有粗粒度和细粒度两种调度模式。
    补充:细粒度模式目前仅支持Mesos。
    粗粒度调度模式中,每个Executor在获得系统资源后,就长期拥有,直到应用程序退出才释放资源。优点:减少了资源调度的时间开销,缺点:所分配的资源被某个应用长期占有,造成资源的浪费。
    细粒度调度模式中,资源是根据任务的需求动态调度的,任务完成后就还给Mesos,所以不存在资源浪费的问题,但调度延迟较大。

    1.LocalBackend

    调用远程的一个引用申请资源,该远程引用已在start方法中赋值
    <code>
    override def reviveOffers() {
    localEndpoint.send(ReviveOffers)
    }
    </code>
    LocalBackend收到远程的ReviveOffers消息在receive方法中进行消息的匹配,进行资源的分配,如下所示:
    <code>
    override def receive: PartialFunction[Any, Unit] = {
    case ReviveOffers =>
    reviveOffers()详见(1)
    case StatusUpdate(taskId, state, serializedData) =>
    scheduler.statusUpdate(taskId, state, serializedData)
    if (TaskState.isFinished(state)) {
    freeCores += scheduler.CPUS_PER_TASK
    reviveOffers()
    }
    case KillTask(taskId, interruptThread) =>
    executor.killTask(taskId, interruptThread)
    }
    </code>
    (1)方法 reviveOffers()如下所示:
    <code>
    def reviveOffers() {
    val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
    for (task <- scheduler.resourceOffers(offers).flatten) {
    freeCores -= scheduler.CPUS_PER_TASK
    //在executor上创建Task
    executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
    task.name, task.serializedTask)
    }
    }
    </code>

    2.SparkDeploySchedulerBackend

    由图1 SchedulerBackend子类继承我们知道SparkDeploySchedulerBackend是类CoarseGrainedSchedulerBackend的子类,属于粗粒度调度模式,类CoarseGrainedSchedulerBackend的子类的调度都是通过它的reviveOffers方法来完成的,因为都属于粗粒度调度模式。
    远程引用申请资源
    <code>
    override def reviveOffers() {
    driverEndpoint.send(ReviveOffers)
    }
    </code>
    同样在receive方法匹配ReviveOffers 消息,调用方法 makeOffers()<code>
    override def receive: PartialFunction[Any, Unit] = {
    case ReviveOffers =>
    makeOffers()详见(1)
    </code>
    (1)makeOffers()方法如下所示
    <code>
    private def makeOffers() {
    //过滤掉正在kill的executor
    val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
    val workOffers = activeExecutors.map { case (id, executorData) =>
    new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
    }.toSeq
    //创建tasks
    launchTasks(scheduler.resourceOffers(workOffers))详见(2)(3)
    }</code>
    (2)scheduler.resourceOffers(workOffers)方法如下所示,TaskScheduleImpl调用提供slaves上的资源。我们通过按照激活的task set的优先级。我们以循环的方式将tasks均匀分配到节点上
    <code>
    def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    var newExecAvail = false
    //对于每一个Executor进行如下操作,主要进行slave的hostname与executor的映射
    for (o <- offers) {
    executorIdToHost(o.executorId) = o.host
    activeExecutorIds += o.executorId
    if (!executorsByHost.contains(o.host)) {
    executorsByHost(o.host) = new HashSetString
    executorAdded(o.executorId, o.host)
    newExecAvail = true
    }
    for (rack <- getRackForHost(o.host)) {
    hostsByRack.getOrElseUpdate(rack, new HashSetString) += o.host
    }
    }
    //随机shuffle操作避免将tasks分配到同样的一批workers上
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    val tasks = shuffledOffers.map(o => new ArrayBufferTaskDescription)
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    //根据调度策略获取TaskSetManage的调度顺序
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
    logDebug("parentName: %s, name: %s, runningTasks: %s".format(
    taskSet.parent.name, taskSet.name, taskSet.runningTasks))
    if (newExecAvail) {
    taskSet.executorAdded()
    }
    }
    //根据调度策略依次得到TaskSet,
    //在节点上尝试5种Locality,PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    //以最快的速度执行task
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
    do {
    launchedTask = resourceOfferSingleTaskSet(
    taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
    } while (launchedTask)
    }
    if (tasks.size > 0) {
    hasLaunchedTask = true
    }
    return tasks
    }
    </code>
    (3)launchTasks方法如下所示
    <code>
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
    for (task <- tasks.flatten) {
    //序列化task
    val serializedTask = ser.serialize(task)
    if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
    scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
    try {
    var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
    "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
    "spark.akka.frameSize or using broadcast variables for large values."
    msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
    AkkaUtils.reservedSizeBytes)
    taskSetMgr.abort(msg)
    } catch {
    case e: Exception => logError("Exception in error callback", e)
    }
    }
    }
    else {
    val executorData = executorDataMap(task.executorId)
    executorData.freeCores -= scheduler.CPUS_PER_TASK
    //类CoarseGrainedExecutorBackend在Executor上反序列化task并完成task的创建
    executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    }
    }
    }
    </code>

    3.MesosSchedulerBackend

    Mesos细粒度调度模式时通过Mesos中的类MesosSchedulerDriver来完成调度,有兴趣的读者可以看一下。这里就不介绍了

    相关文章

      网友评论

        本文标题:Spark不同运行模式下资源分配源码解读

        本文链接:https://www.haomeiwen.com/subject/buzylttx.html