美文网首页
spark源码阅读之executor模块③

spark源码阅读之executor模块③

作者: invincine | 来源:发表于2019-02-14 14:42 被阅读0次

    本文基于Spark 1.6.3源码,采用一步一步深入的方式来展开阅读,本文是为了纪录自己在阅读源码时候的思路,看完一遍真的很容易忘记,写一篇文章梳理一遍可以加深印象。

    spark源码阅读之executor模块①中,我们创建了DriverEndpoint并说明它会周期性的通过给自己发送ReviveOffers消息而去调用makeOffers()方法,从而实现为executor分配资源并加载Tasks。
    spark源码阅读之executor模块②中,我们分析了application的提交、driver的初始化以及在workers上分配了executors,最后也是调用了DriverEndpoint的makeOffers()方法给executors分配Tasks。
    所以这一章,我们就以DriverEndpoint的makeOffers()方法为起点来分析:如何给executors加载Tasks,如何执行这些Tasks。

    分配Tasks

    下面就来看一下makeOffers方法的源码

    // Make fake resource offers on all executors
    private def makeOffers() {
      // Filter out executors under killing
      // 选出还活着的executors,然后准备分配资源
      val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
      val workOffers = activeExecutors.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq
      launchTasks(scheduler.resourceOffers(workOffers))   //先调用scheduler的resourceOffers方法分配资源,然后再launchTasks
    }
    

    代码中的scheduler是TaskSchedulerImpl的实例,resourceOffers方法的作用是给executors分配封装好的TaskSet,这其中的工作主要由TaskSchedulerImpl和DAGScheduler来完成,这部分属于调度模块的内容,准备放到调度模块再分析。

    在分配好Task之后,调用launchTasks方法来加载Tasks

    // Launch tasks returned by a set of resource offers
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val serializedTask: ByteBuffer = ser.serialize(task)    //序列化task
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                "spark.akka.frameSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
                AkkaUtils.reservedSizeBytes)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          val executorData: ExecutorData = executorDataMap(task.executorId)   //取出封装executor信息的executorData
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))    //向executor的rpcEndpoint发送LaunchTask的消息
        }
      }
    }
    

    这里有个限制,序列化之后的Task的大小,如果大于spark.akka.frameSize (默认128M) - reservedSizeBytes(固定200KB),会报错,提示可以调整spark.akka.frameSize大小或者采用广播的方法来传递大消息体。

    如果是满足要求的Task,将序列化之后的Task数据包装后,拿出对应的ExecutorData实例,发送LaunchTask的消息,给其executorEndpoint。

    CoarseGrainedExecutorBackend在收到LaunchTask请求后的代码如下:

    case LaunchTask(data) =>    //executor收到了LaunchTask的请求
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        val taskDesc = ser.deserialize[TaskDescription](data.value) //将收到的task反序列化
        logInfo("Got assigned task " + taskDesc.taskId)
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,   //加载task
          taskDesc.name, taskDesc.serializedTask)
      }
    

    首先将收到的Task反序列化,然后调用executor的launchTask方法加载Task,launchTask方法如下:

    def launchTask(
        context: ExecutorBackend,
        taskId: Long,
        attemptNumber: Int,
        taskName: String,
        serializedTask: ByteBuffer): Unit = {
      val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
        serializedTask)   //将task的信息封装成TaskRunner
      runningTasks.put(taskId, tr)
      threadPool.execute(tr)    //将TaskRunner放到threadPool中去执行
    }
    

    在此方法中,将Task数据封装成一个TaskRunner,然后放到线程池中去执行,可见实际执行Task的过程体现在TaskRunner的run()方法中,TaskRunner的run方法分为三个部分:

    1. 反序列化并下载Task的依赖
    2. 执行Task
    3. 处理返回的结果

    Task的执行

    第一部分没什么好说的,我们一起来看看第二部分,以下是run方法的截选:

    override def run(): Unit = {
    ......省略不重要代码
    // Run the actual task and measure its runtime.
    // 开始执行Task且确定它的运行时间
    taskStart = System.currentTimeMillis()
    var threwException = true
    val (value, accumUpdates) = try {
      val res = task.run(   //最终调用Task的run方法来执行Task
        taskAttemptId = taskId,
        attemptNumber = attemptNumber,
        metricsSystem = env.metricsSystem)
      threwException = false
      res
    ......
    

    task是反序列化得到的Task实例,调用其run方法来执行Task

    final def run(
      taskAttemptId: Long,
      attemptNumber: Int,
      metricsSystem: MetricsSystem)
    : (T, AccumulatorUpdates) = {
      context = new TaskContextImpl(
        stageId,
        partitionId,
        taskAttemptId,
        attemptNumber,
        taskMemoryManager,
        metricsSystem,
        internalAccumulators,
        runningLocally = false)   //首先构造一个TaskContext,然后将参数配置到这个TaskContext中
      TaskContext.setTaskContext(context)
      context.taskMetrics.setHostname(Utils.localHostName())
      context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
      taskThread = Thread.currentThread()
      if (_killed) {
        kill(interruptThread = false)
      }
      try {
        (runTask(context), context.collectAccumulators())   //运行Task
      } catch {
        case e: Throwable =>
          // Catch all errors; run task failure callbacks, and rethrow the exception.
          try {
            context.markTaskFailed(e)
          } catch {
            case t: Throwable =>
              e.addSuppressed(t)
          }
          throw e
      } finally {
        // Call the task completion callbacks.
        // Task结束时的回调函数
        context.markTaskCompleted()
        try {
          Utils.tryLogNonFatalError {
            // Release memory used by this thread for unrolling blocks
            SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
            SparkEnv.get.blockManager.memoryStore.releasePendingUnrollMemoryForThisTask()
            // Notify any tasks waiting for execution memory to be freed to wake up and try to
            // acquire memory again. This makes impossible the scenario where a task sleeps forever
            // because there are no other tasks left to notify it. Since this is safe to do but may
            // not be strictly necessary, we should revisit whether we can remove this in the future.
            val memoryManager = SparkEnv.get.memoryManager
            memoryManager.synchronized { memoryManager.notifyAll() }
          }
        } finally {
          TaskContext.unset()
        }
      }
    }
    

    以上run方法中,首先创建了一个TaskContext保留Task的上下文,然后调用runTask方法来运行这个Task,最后结束时调用回调函数markTaskCompleted,来完成最终结果的处理。

    runTask方法在不同类型的Task中会有不同的实现,主要分析ShuffleMapTask和ResultTask
    针对最后一个stage生成的Task就叫做ResultTask,ResultTask会将最终计算结果汇报道driver端,具体实现如下:

    override def runTask(context: TaskContext): U = {
      // Deserialize the RDD and the func using the broadcast variables.
      val deserializeStartTime = System.currentTimeMillis()
      val ser = SparkEnv.get.closureSerializer.newInstance()  //获取用于反序列化的实例
      // 获取RDD和作用于RDD结果的函数
      val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
        ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
      _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
      metrics = Some(context.taskMetrics)   //Task的metrics信息
      // 调用rdd.iterator执行rdd上的计算
      func(context, rdd.iterator(partition, context)) 
    }
    

    ShuffleMapTask计算得到的是中间结果,且需要通过shuffle策略来生成中间文件落地之后供下游Task来fetch,具体实现放在shuffle模块详解,这里展示其runTask方法

    override def runTask(context: TaskContext): MapStatus = {
      // Deserialize the RDD using the broadcast variable.
      // 反序列化广播变量taskBinary得到RDD
      val deserializeStartTime = System.currentTimeMillis()
      val ser = SparkEnv.get.closureSerializer.newInstance()
      val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
        ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
      _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
      metrics = Some(context.taskMetrics)
      var writer: ShuffleWriter[Any, Any] = null
      try {
        // 获得shuffle manager
        val manager = SparkEnv.get.shuffleManager
        writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)   // 获得shuffle writer
        // 首先调用RDD的iterator,如果这个RDD已经cache或者checkpoint了,直接读取结果,否则就开始计算
        // 计算结果调用shuffle write写入本地文件系统
        writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
        // 返回数据的元数据信息,包括location和size
        writer.stop(success = true).get
      } catch {
        case e: Exception =>
          try {
            if (writer != null) {
              writer.stop(success = false)
            }
          } catch {
            case e: Exception =>
              log.debug("Could not stop writer", e)
          }
          throw e
      }
    }
    

    Task计算结果处理

    接下来我们再来看第三部分,处理返回的结果,以下截选,Executor的run方法中的其中一段

    val resultSer = env.serializer.newInstance()  //获取序列化工具
    val beforeSerialization = System.currentTimeMillis()
    val valueBytes = resultSer.serialize(value)   //序列化结果
    val afterSerialization = System.currentTimeMillis()
    for (m <- task.metrics) {
      // Deserialization happens in two parts: first, we deserialize a Task object, which
      // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
      m.setExecutorDeserializeTime(
        (taskStart - deserializeStartTime) + task.executorDeserializeTime)
      // We need to subtract Task.run()'s deserialization time to avoid double-counting
      m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
      m.setJvmGCTime(computeTotalGcTime() - startGCTime)
      m.setResultSerializationTime(afterSerialization - beforeSerialization)
      m.updateAccumulators()
    }
    // 首先将结果放入到DirectTaskResult
    val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
    val serializedDirectResult = ser.serialize(directResult)    //序列化结果
    val resultSize = serializedDirectResult.limit   //序列化结果的大小
    // directSend = sending directly back to the driver
    // 将结果回传给driver
    val serializedResult: ByteBuffer = {
      if (maxResultSize > 0 && resultSize > maxResultSize) {    // 如果序列化结果大于maxResultSize,直接抛弃,默认1GB
        logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
          s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
          s"dropping it.")
        ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
      } else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {   //“较大”的结果:akkaFrameSize默认128M
        val blockId = TaskResultBlockId(taskId)
        env.blockManager.putBytes(    // 结果存入blockManager
          blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
        logInfo(
          s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
        ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
      } else {
        logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
        serializedDirectResult // 如果结果不大,直接回传给driver
      }
    }
    // execBackend是ExecutorBackend的一个实例,实际上是Executor与Driver通信的接口
    execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) 
    

    这里回传给Driver时结果大小的限制与上文中所述限制一样,这段代码最后将结果发回Driver端,Driver端在收到statusUpdate消息之后,会调用TaskSchedulerImpl的statusUpdate方法来处理收到的Task运行结果,并将完成的资源通过makeOffers方法释放出来,具体实现如下:

    case StatusUpdate(executorId, taskId, state, data) =>  // 收到Task的运行结果
      scheduler.statusUpdate(taskId, state, data.value)
      if (TaskState.isFinished(state)) {
        executorDataMap.get(executorId) match {
          case Some(executorInfo) =>
            executorInfo.freeCores += scheduler.CPUS_PER_TASK
            makeOffers(executorId)
          case None =>
            // Ignoring the update since we don't know about the executor.
            logWarning(s"Ignored task status update ($taskId state $state) " +
              s"from unknown executor with ID $executorId")
        }
      }
    

    在TaskSchedulerImpl的statusUpdate方法中,如果返回的Task状态是FINISHED,就调用enqueueSuccessfulTask方法来处理结果,如果返回的状态有异常(FAILED、KILLED、LOST),那么会调用enqueueFailedTask方法来处理结果,逻辑如下(截选statusUpdate)

    taskIdToTaskSetManager.get(tid) match {
      case Some(taskSet) =>
        // FINISHED_STATES = Set(FINISHED, FAILED, KILLED, LOST)
        if (TaskState.isFinished(state)) {    //如果这个任务是已经结束了的状态
          taskIdToTaskSetManager.remove(tid)
          taskIdToExecutorId.remove(tid).foreach { execId =>
            if (executorIdToTaskCount.contains(execId)) {
              executorIdToTaskCount(execId) -= 1    //清理一些缓存
            }
          }
        }
        if (state == TaskState.FINISHED) {    // 如果返回的状态值是FINISHED,说明是正常结束了
          taskSet.removeRunningTask(tid)
          taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData) //调用enqueueSuccessfulTask进行处理
        } else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {   //如果是其他的状态,说明是任务失败了
          taskSet.removeRunningTask(tid)
          taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
        }
      case None =>
        logError(
          ("Ignoring update with state %s for TID %s because its task set is gone (this is " +
            "likely the result of receiving duplicate task finished status updates)")
            .format(state, tid))
    }
    

    在enqueueSuccessfulTask方法中处理的Task结果分为两种:DirectTaskResult和IndirectTaskResult,IndirectTaskResult需要通过blockId到BlockManager中获取结果,结果数据反序列化之后,调用TaskSchedulerImpl的handleSuccessfulTask方法来处理结果,具体实现如下:

    def enqueueSuccessfulTask(    //处理得到的计算结果
      taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
      getTaskResultExecutor.execute(new Runnable {
        override def run(): Unit = Utils.logUncaughtExceptions {
          try {
            val (result, size) = serializer.get().deserialize[TaskResult[_]](serializedData) match {
              case directResult: DirectTaskResult[_] => //如果是directResult
                if (!taskSetManager.canFetchMoreResults(serializedData.limit())) {
                  return
                }
                // deserialize "value" without holding any lock so that it won't block other threads.
                // We should call it here, so that when it's called again in
                // "TaskSetManager.handleSuccessfulTask", it does not need to deserialize the value.
                directResult.value()
                (directResult, serializedData.limit())
              case IndirectTaskResult(blockId, size) => //如果是IndirectTaskResult,那么需要通过blickId到BlockManager中去获取结果:“较
                if (!taskSetManager.canFetchMoreResults(size)) {
                  // dropped by executor if size is larger than maxResultSize
                  sparkEnv.blockManager.master.removeBlock(blockId)
                  return
                }
                logDebug("Fetching indirect task result for TID %s".format(tid))
                scheduler.handleTaskGettingResult(taskSetManager, tid)
                val serializedTaskResult = sparkEnv.blockManager.getRemoteBytes(blockId)
                if (!serializedTaskResult.isDefined) {
                  /* We won't be able to get the task result if the machine that ran the task failed
                   * between when the task ended and when we tried to fetch the result, or if the
                   * block manager had to flush the result. */
                  scheduler.handleFailedTask(
                    taskSetManager, tid, TaskState.FINISHED, TaskResultLost)
                  return
                }
                val deserializedResult = serializer.get().deserialize[DirectTaskResult[_]](
                  serializedTaskResult.get)
                sparkEnv.blockManager.master.removeBlock(blockId)
                (deserializedResult, size)
            }
            result.metrics.setResultSize(size)
            scheduler.handleSuccessfulTask(taskSetManager, tid, result)  //调用handleSuccessfulTask来处理结果
          } catch {
            case cnf: ClassNotFoundException =>
              val loader = Thread.currentThread.getContextClassLoader
              taskSetManager.abort("ClassNotFound with classloader: " + loader)
            // Matching NonFatal so we don't catch the ControlThrowable from the "return" above.
            case NonFatal(ex) =>
              logError("Exception while getting task result", ex)
              taskSetManager.abort("Exception while getting task result: %s".format(ex))
          }
        }
      })
    }
    

    TaskSchedulerImpl的handleSuccessfulTask方法中首先标记Task已经完成,然后会调用DAGScheduler的taskEnded方法处理结果,DAGScheduler中有一个自监测的DAGSchedulerEventProcessLoop实例,通过这个实例发送CompletionEvent的消息,最后调用DAGScheduler的handleTaskCompletion方法,具体实现如下:

    /**
     * Called by the TaskSetManager to report task completions or failures.
      * TaskSetManager向DAGScheduler报告tasks是否成功处理
     */
    def taskEnded(
        task: Task[_],
        reason: TaskEndReason,
        result: Any,
        accumUpdates: Map[Long, Any],
        taskInfo: TaskInfo,
        taskMetrics: TaskMetrics): Unit = {
      eventProcessLoop.post(
        CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
    }
    

    eventProcessLoop其实就是DAGSchedulerEventProcessLoop的实例,用来监测DAGScheduler的自身状态,在其doOnReceive中通过模式匹配的方法对各种消息调用不同的方法来处理:

    private def doOnReceive(event: DAGSchedulerEvent): Uni
        //如果提交的是一个JobSubmitted的Event,那么调用handleJobSubmitte
      case JobSubmitted(jobId, rdd, func, partitions, call
        dagScheduler.handleJobSubmitted(jobId, rdd, func, 
      case MapStageSubmitted(jobId, dependency, callSite, 
        dagScheduler.handleMapStageSubmitted(jobId, depend
      case StageCancelled(stageId) =>
        dagScheduler.handleStageCancellation(stageId)
      case JobCancelled(jobId) =>
        dagScheduler.handleJobCancellation(jobId)
      case JobGroupCancelled(groupId) =>
        dagScheduler.handleJobGroupCancelled(groupId)
      case AllJobsCancelled =>
        dagScheduler.doCancelAllJobs()
      case ExecutorAdded(execId, host) =>
        dagScheduler.handleExecutorAdded(execId, host)
      case ExecutorLost(execId) =>
        dagScheduler.handleExecutorLost(execId, fetchFaile
      case BeginEvent(task, taskInfo) =>
        dagScheduler.handleBeginEvent(task, taskInfo)
      case GettingResultEvent(taskInfo) =>
        dagScheduler.handleGetTaskResult(taskInfo)
      case completion @ CompletionEvent(task, reason, _, _
        dagScheduler.handleTaskCompletion(completion)
      case TaskSetFailed(taskSet, reason, exception) =>
        dagScheduler.handleTaskSetFailed(taskSet, reason, 
      case ResubmitFailedStages =>
        dagScheduler.resubmitFailedStages()
    }
    

    可见CompletionEvent消息调用了DAGScheduler的handleTaskCompletion方法,在这个方法中,针对两种Task(ResultTask和ShuffleMapTask)有不同的处理逻辑,如果是ResultTask,则job标记结束;如果是ShuffleMapTask则说明不是最后一个stage,调用MapOutputTrackerMaster将Task的运行结果注册到mapOutput,接下来下一个stage的Task就会通过shuffle read去读取这部分数据,这部分内容将在shuffle模块中说明,以下截选DAGScheduler的handleTaskCompletion方法:

    task match {
      case rt: ResultTask[_, _] =>
        // Cast to ResultStage here because it's part of the ResultTask
        // TODO Refactor this out to a function that accepts a ResultStage
        val resultStage = stage.asInstanceOf[ResultStage]
        resultStage.activeJob match {
          case Some(job) =>
            if (!job.finished(rt.outputId)) {
              updateAccumulators(event)
              job.finished(rt.outputId) = true
              job.numFinished += 1
              // If the whole job has finished, remove it
              if (job.numFinished == job.numPartitions) {
                markStageAsFinished(resultStage)
                cleanupStateForJobAndIndependentStages(job)
                listenerBus.post(
                  SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
              }
              // taskSucceeded runs some user code that might throw an exception. Make sure
              // we are resilient against that.
              try {
                job.listener.taskSucceeded(rt.outputId, event.result)
              } catch {
                case e: Exception =>
                  // TODO: Perhaps we want to mark the resultStage as failed?
                  job.listener.jobFailed(new SparkDriverExecutionException(e))
              }
            }
          case None =>
            logInfo("Ignoring result from " + rt + " because its job has finished")
        }
      case smt: ShuffleMapTask => // 如果是ShuffleMapTask发来的Task Result
        val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
        updateAccumulators(event)
        val status = event.result.asInstanceOf[MapStatus]
        val execId = status.location.executorId
        logDebug("ShuffleMapTask finished on " + execId)
        if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
          logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
        } else {
          shuffleStage.addOutputLoc(smt.partitionId, status)
        }
        if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
          markStageAsFinished(shuffleStage)
          logInfo("looking for newly runnable stages")
          logInfo("running: " + runningStages)
          logInfo("waiting: " + waitingStages)
          logInfo("failed: " + failedStages)
          // We supply true to increment the epoch number here in case this is a
          // recomputation of the map outputs. In that case, some nodes may have cached
          // locations with holes (from when we detected the error) and will need the
          // epoch incremented to refetch them.
          // TODO: Only increment the epoch number if this is not the first time
          //       we registered these map outputs.
          mapOutputTracker.registerMapOutputs(  // 将Task的运行结果注册到mapOutputTracker
            shuffleStage.shuffleDep.shuffleId,
            shuffleStage.outputLocInMapOutputTrackerFormat(),
            changeEpoch = true)
          clearCacheLocs()
          if (!shuffleStage.isAvailable) {
            // Some tasks had failed; let's resubmit this shuffleStage
            // TODO: Lower-level scheduler should also deal with this
            logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
              ") because some of its tasks had failed: " +
              shuffleStage.findMissingPartitions().mkString(", "))
            submitStage(shuffleStage)
          } else {
            // Mark any map-stage jobs waiting on this stage as finished
            if (shuffleStage.mapStageJobs.nonEmpty) {
              val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
              for (job <- shuffleStage.mapStageJobs) {
                markMapStageJobAsFinished(job, stats)
              }
            }
          }
    

    至此,Task运行结束,executor模块的源码阅读也告一段落,回顾executor模块的三篇文章,我们从SparkContext这个交互接口入手,详细描述了application是如何注册的,driver是如何生成的,driver和executor如何分配计算资源,分配完成之后又是怎样启动executor的,接着分析了Task是怎么分配给executor的,又是如何计算的,计算结果又做何处理。总结成一句话:executor负责Task的计算,并将计算结果回传给Driver。

    相关文章

      网友评论

          本文标题:spark源码阅读之executor模块③

          本文链接:https://www.haomeiwen.com/subject/gtkdeqtx.html