Spark Job 详细执行流程(二)

作者: imarch1 | 来源:发表于2016-04-08 14:18 被阅读2723次

    Spark Job执行流程大体如下:用户提交Job后会生成SparkContext对象,SparkContext向Cluster Manager(在Standalone模式下是Spark Master)申请Executor资源,并将Job分解成一系列可并行处理的task,然后将task分发到不同的Executor上运行,Executor在task执行完后将结果返回到SparkContext。

    上文中(戳这)详细介绍了Spark申请Executor资源的过程。下面介绍Job从拆分成一系列task到task分发到Executor上运行的过程。整个过程如下图所示。

    Job执行流程
    1. DAGScheduler接收用户提交的job
      用户提交Job后,SparkContext通过runJob()调用DAGScheduler的runJob()。在runJob()中,调用submitJob来提交Job,然后等待Job的运行结果。
    def runJob[T, U](  
                            rdd: RDD[T],  
                            func: (TaskContext, Iterator[T]) => U,  
                            partitions: Seq[Int],  
                            callSite: CallSite,  
                            allowLocal: Boolean,  
                            resultHandler: (Int, U) => Unit,  
                            properties: Properties): Unit = {  
        val start = System.nanoTime  
        val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)  
        waiter.awaitResult() match {  
            case JobSucceeded =>  
                logInfo("Job %d finished: %s, took %f s".format  
                (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))  
            case JobFailed(exception: Exception) =>  
                logInfo("Job %d failed: %s, took %f s".format  
                (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))  
                throw exception  
        }  
    }  
    

    submitJob()通过eventProcessLoop把Job交给handleJobSubmitted()处理。

    def submitJob[T, U](  
                               rdd: RDD[T],  
                               func: (TaskContext, Iterator[T]) => U,  
                               partitions: Seq[Int],  
                               callSite: CallSite,  
                               allowLocal: Boolean,  
                               resultHandler: (Int, U) => Unit,  
                               properties: Properties): JobWaiter[U] = {  
        // Check to make sure we are not launching a task on a partition that does not exist.  
        val maxPartitions = rdd.partitions.length  
        partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>  
            throw new IllegalArgumentException(  
                "Attempting to access a non-existent partition: " + p + ". " +  
                        "Total number of partitions: " + maxPartitions)  
        }  
      
        val jobId = nextJobId.getAndIncrement()  
        if (partitions.size == 0) {  
            return new JobWaiter[U](this, jobId, 0, resultHandler)  
        }  
      
        assert(partitions.size > 0)  
        val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]  
        val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)  
        eventProcessLoop.post(JobSubmitted(  
            jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,  
            SerializationUtils.clone(properties)))  
        waiter  
    }  
    
    1. DAGScheduler将job拆分为不同的stage
      首先每个job自动产生一个finalStage,然后递归地得到整个stage DAG。
    private[scheduler] def handleJobSubmitted(jobId: Int,  
                                                finalRDD: RDD[_],  
                                                func: (TaskContext, Iterator[_]) => _,  
                                                partitions: Array[Int],  
                                                allowLocal: Boolean,  
                                                callSite: CallSite,  
                                                listener: JobListener,  
                                                properties: Properties) {  
          var finalStage: ResultStage = null  
          try {  
              // New stage creation may throw an exception if, for example, jobs are run on a  
              // HadoopRDD whose underlying HDFS files have been deleted.  
              finalStage = newResultStage(finalRDD, partitions.size, jobId, callSite)  
          } catch {  
              case e: Exception =>  
                  logWarning("Creating new stage failed due to exception - job: " + jobId, e)  
                  listener.jobFailed(e)  
                  return  
          }  
          if (finalStage != null) {  
              val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)  
              clearCacheLocs()  
              logInfo("Got job %s (%s) with %d output partitions (allowLocal=%s)".format(  
                  job.jobId, callSite.shortForm, partitions.length, allowLocal))  
              logInfo("Final stage: " + finalStage + "(" + finalStage.name + ")")  
              logInfo("Parents of final stage: " + finalStage.parents)  
              logInfo("Missing parents: " + getMissingParentStages(finalStage))  
              val shouldRunLocally =  
                  localExecutionEnabled && allowLocal && finalStage.parents.isEmpty && partitions.length == 1  
              val jobSubmissionTime = clock.getTimeMillis()  
              if (shouldRunLocally) {  
                  // Compute very short actions like first() or take() with no parent stages locally.  
                  listenerBus.post(  
                      SparkListenerJobStart(job.jobId, jobSubmissionTime, Seq.empty, properties))  
                  runLocally(job)  
              } else {  
                  jobIdToActiveJob(jobId) = job  
                  activeJobs += job  
                  finalStage.resultOfJob = Some(job)  
                  val stageIds = jobIdToStageIds(jobId).toArray  
                  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))  
                  listenerBus.post(  
                      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))  
                  submitStage(finalStage)  
              }  
          }  
          submitWaitingStages()  
      }  
    

    submitStage负责得到整个stage DAG,并调用submitMissingTasks(()把每个stage拆分成可运行的task。

    private def submitStage(stage: Stage) {  
        val jobId = activeJobForStage(stage)  
        if (jobId.isDefined) {  
            logDebug("submitStage(" + stage + ")")  
            if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {  
                val missing = getMissingParentStages(stage).sortBy(_.id)  
                logDebug("missing: " + missing)  
                if (missing.isEmpty) {  
                    logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")  
                    submitMissingTasks(stage, jobId.get)  
                } else {  
                    for (parent <- missing) {  
                        submitStage(parent)  
                    }  
                    waitingStages += stage  
                }  
            }  
        } else {  
            abortStage(stage, "No active job for stage " + stage.id)  
        }  
    }  
    

    注意stage之间有依赖关系,所以Spark是一个一个stage地运行。正在运行的stage保存在runningStages,等待运行的stage保存在waitingStages。当一个stage运行成功后,DAGScheduler在handleTaskCompletion()里运行下一个stage。

    private[scheduler] def handleTaskCompletion(event: CompletionEvent) {  
            val stage = stageIdToStage(task.stageId)  
            event.reason match {  
                case Success =>  
                    listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,  
                        event.reason, event.taskInfo, event.taskMetrics))  
                    stage.pendingTasks -= task  
                    task match {  
                        ......  
                        case smt: ShuffleMapTask =>  
                            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]  
                            updateAccumulators(event)  
                            val status = event.result.asInstanceOf[MapStatus]  
                            val execId = status.location.executorId  
                            logDebug("ShuffleMapTask finished on " + execId)  
                            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {  
                                logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)  
                            } else {  
                                shuffleStage.addOutputLoc(smt.partitionId, status)  
                            }  
                            if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {  
                                markStageAsFinished(shuffleStage)  
                                mapOutputTracker.registerMapOutputs(  
                                    shuffleStage.shuffleDep.shuffleId,  
                                    shuffleStage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray,  
                                    changeEpoch = true)  
      
                                clearCacheLocs()  
                                if (shuffleStage.outputLocs.contains(Nil)) {  
                                    logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +  
                                            ") because some of its tasks had failed: " +  
                                            shuffleStage.outputLocs.zipWithIndex.filter(_._1.isEmpty)  
                                                    .map(_._2).mkString(", "))  
                                    submitStage(shuffleStage)  
                                } else {  
                                    val newlyRunnable = new ArrayBuffer[Stage]  
                                    for (shuffleStage <- waitingStages) {  
                                        logInfo("Missing parents for " + shuffleStage + ": " +  
                                                getMissingParentStages(shuffleStage))  
                                    }  
                                    for (shuffleStage <- waitingStages if getMissingParentStages(shuffleStage).isEmpty) {  
                                        newlyRunnable += shuffleStage  
                                    }  
                                    waitingStages --= newlyRunnable  
                                    runningStages ++= newlyRunnable  
                                    for {  
                                        shuffleStage <- newlyRunnable.sortBy(_.id)  
                                        jobId <- activeJobForStage(shuffleStage)  
                                    } {  
                                        logInfo("Submitting " + shuffleStage + " (" +  
                                                shuffleStage.rdd + "), which is now runnable")  
                                        submitMissingTasks(shuffleStage, jobId)  
                                    }  
                                }  
                            }  
                    }  
            }  
            submitWaitingStages()  
            ......  
        }  
    
    1. DAGScheduler把每个stage拆分为可并行计算的task, 并将所有task提交到TaskSchedulerImpl
      submitMissingTasks产生出与partition数量相等的task,并封装成TaskSet,提交给TaskSchedulerImpl。
    private def submitMissingTasks(stage: Stage, jobId: Int) {  
    ......  
    if (tasks.size > 0) {  
                logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")  
                stage.pendingTasks ++= tasks  
                logDebug("New pending tasks: " + stage.pendingTasks)  
                taskScheduler.submitTasks(  
                    new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))  
                stage.latestInfo.submissionTime = Some(clock.getTimeMillis())  
            } else {  
                // Because we posted SparkListenerStageSubmitted earlier, we should mark  
                // the stage as completed here in case there are no tasks to run  
                markStageAsFinished(stage, None)  
      
                val debugString = stage match {  
                    case stage: ShuffleMapStage =>  
                        s"Stage ${stage} is actually done; " +  
                                s"(available: ${stage.isAvailable}," +  
                                s"available outputs: ${stage.numAvailableOutputs}," +  
                                s"partitions: ${stage.numPartitions})"  
                    case stage: ResultStage =>  
                        s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"  
                }  
                logDebug(debugString)  
            }  
        }  
    

    TaskSchedulerImpl的submitTasks将TaskSet封装成TaskSetManager,放入调度器(schedulableBuilder)等待调度(Spark有两种调度方式:FIFO和Fair。注意只调度同一SparkContext下的任务)。之后调用SparkDeploySchedulerBackend的reviveOffers()。TaskSetManager主要用来调度一个TaskSet内的task,比如,为给定的executor分配一个task。

    override def submitTasks(taskSet: TaskSet) {  
      val tasks = taskSet.tasks  
      logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")  
      this.synchronized {  
        val manager = createTaskSetManager(taskSet, maxTaskFailures)  
        activeTaskSets(taskSet.id) = manager  
        schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)  
      
        if (!isLocal && !hasReceivedTask) {  
          starvationTimer.scheduleAtFixedRate(new TimerTask() {  
            override def run() {  
              if (!hasLaunchedTask) {  
                logWarning("Initial job has not accepted any resources; " +  
                  "check your cluster UI to ensure that workers are registered " +  
                  "and have sufficient resources")  
              } else {  
                this.cancel()  
              }  
            }  
          }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)  
        }  
        hasReceivedTask = true  
      }  
      backend.reviveOffers()  
    }  
    

    SparkDeploySchedulerBackend的reviveOffers()向driver发送ReviveOffers,driver收到ReviveOffers后调用makeOffers()。

    case ReviveOffers =>  
            makeOffers()  
    
    1. SparkDeploySchedulerBackend调用Executor执行task
      首先通过resourceOffers得到在哪个Executor运行哪个task的信息,然后调用launchTasks向Executor发送task。
    private def makeOffers() {  
      // Filter out executors under killing  
      val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))  
      val workOffers = activeExecutors.map { case (id, executorData) =>  
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)  
      }.toSeq  
      launchTasks(scheduler.resourceOffers(workOffers))  
    }  
    
    1. Executor执行task
      CoarseGrainedExecutorBackend在接收到LaunchTask后,调用Executor的launchTask运行task。
    override def receive: PartialFunction[Any, Unit] = {  
      
        case LaunchTask(data) =>  
          if (executor == null) {  
            logError("Received LaunchTask command but executor was null")  
            System.exit(1)  
          } else {  
            val taskDesc = ser.deserialize[TaskDescription](data.value)  
            logInfo("Got assigned task " + taskDesc.taskId)  
            executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,  
              taskDesc.name, taskDesc.serializedTask)  
          }  
    

    Executor的内部是一个线程池,每一个提交的task都会包装为TaskRunner交由threadpool执行。

    def launchTask(  
        context: ExecutorBackend,  
        taskId: Long,  
        attemptNumber: Int,  
        taskName: String,  
        serializedTask: ByteBuffer): Unit = {  
      val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,  
        serializedTask)  
      runningTasks.put(taskId, tr)  
      threadPool.execute(tr)  
    }  
    

    在TaskRunner中,task.run()真正运行每个task的任务。

    class TaskRunner(  
          execBackend: ExecutorBackend,  
          val taskId: Long,  
          val attemptNumber: Int,  
          taskName: String,  
          serializedTask: ByteBuffer)  
        extends Runnable {  
        ...... 
        override def run(): Unit = {  
          val taskMemoryManager = new TaskMemoryManager(env.executorMemoryManager)  
          val deserializeStartTime = System.currentTimeMillis()  
          Thread.currentThread.setContextClassLoader(replClassLoader)  
          val ser = env.closureSerializer.newInstance()  
          logInfo(s"Running $taskName (TID $taskId)")  
          execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)  
          var taskStart: Long = 0  
          startGCTime = computeTotalGcTime()  
      
          try {  
            val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)  
            updateDependencies(taskFiles, taskJars)  
            task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)  
            task.setTaskMemoryManager(taskMemoryManager)  
            ......
            env.mapOutputTracker.updateEpoch(task.epoch)  
      
            // Run the actual task and measure its runtime.  
            taskStart = System.currentTimeMillis()  
            val value = try {  
              task.run(taskAttemptId = taskId, attemptNumber = attemptNumber)  
            } finally {  
              ......
            }  
            ......
        }  
      }  
    

    最终,每个task的运行都会调用iterator()来递归计算RDD。下面是以ShufflerMapTask为例,rdd.iterator(partition, context)会从根partition来计算这个task的输出partition。

    private[spark] class ShuffleMapTask(  
        stageId: Int,  
        taskBinary: Broadcast[Array[Byte]],  
        partition: Partition,  
        @transient private var locs: Seq[TaskLocation])  
      extends Task[MapStatus](stageId, partition.index) with Logging {  
      
      override def runTask(context: TaskContext): MapStatus = {  
        // Deserialize the RDD using the broadcast variable.  
        val deserializeStartTime = System.currentTimeMillis()  
        val ser = SparkEnv.get.closureSerializer.newInstance()  
        val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](  
          ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)  
        _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime  
      
        metrics = Some(context.taskMetrics)  
        var writer: ShuffleWriter[Any, Any] = null  
        try {  
          val manager = SparkEnv.get.shuffleManager  
          writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)  
          writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])  
          return writer.stop(success = true).get  
        } catch {  
          case e: Exception =>  
            try {  
              if (writer != null) {  
                writer.stop(success = false)  
              }  
            } catch {  
              case e: Exception =>  
                log.debug("Could not stop writer", e)  
            }  
            throw e  
        }  
      }  
    }  
    

    至此,一个stage的TaskSet的执行流程结束,等此TaskSet中的所有task结束后会回到第三步继续执行下一个stage,直到finalStage结束。:)

    相关文章

      网友评论

        本文标题:Spark Job 详细执行流程(二)

        本文链接:https://www.haomeiwen.com/subject/ityvlttx.html