美文网首页
Spark2.4.0 DAG(DAGScheduler)源码分析

Spark2.4.0 DAG(DAGScheduler)源码分析

作者: 井地儿 | 来源:发表于2019-03-20 21:33 被阅读0次

    Spark的DAG(Directed Acyclic Graph)的生成实际上是Stage的划分,而Stage的划分依据是RDD的依赖关系。在程序提交后,Spark先将所有的RDD看作是一个Stage,然后从后向前回溯,窄依赖划分到同一个Stage,遇到宽依赖(ShuffleDependency)则划分一个新的Stage,如此便形成了DAG。
    DAG的实现在org.apache.spark.scheduler.DAGScheduler中。

    image.png

    DAGScheduler源码解读

    Stage的构建

    spark DAG中stage的创建是通过getOrCreateParentStages方法实现的。
    通过给定的RDD获取或创建父stages清单。首先通过getShuffleDependencies方法获取所有的宽依赖,然后遍历宽依赖,构建Stage,返回Stage集合。

    getOrCreateParentStages

    /**
       * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
       * the provided firstJobId.
       */
      private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        getShuffleDependencies(rdd).map { shuffleDep =>
          getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
      }
    

    getShuffleDependencies

    通过rdd获取所有父RDD的宽依赖。
    算法实现上采用了数组栈(ArrayStack)来实现。

    /**
       * Returns shuffle dependencies that are immediate parents of the given RDD.
       *
       * This function will not return more distant ancestors.  For example, if C has a shuffle
       * dependency on B which has a shuffle dependency on A:
       *
       * A <-- B <-- C
       *
       * calling this function with rdd C will only return the B <-- C dependency.
       *
       * This function is scheduler-visible for the purpose of unit testing.
       */
      private[scheduler] def getShuffleDependencies(
          rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
        // 存放宽依赖
        val parents = new HashSet[ShuffleDependency[_, _, _]]
        // 存放已经访问过的RDD
        val visited = new HashSet[RDD[_]]
        // 存放待访问的RDD,通过数组栈来实现
        val waitingForVisit = new ArrayStack[RDD[_]]
        waitingForVisit.push(rdd)
        while (waitingForVisit.nonEmpty) {
          val toVisit = waitingForVisit.pop()
          if (!visited(toVisit)) {
            visited += toVisit
            toVisit.dependencies.foreach {
              case shuffleDep: ShuffleDependency[_, _, _] =>
                parents += shuffleDep
              case dependency =>
                waitingForVisit.push(dependency.rdd)
            }
          }
        }
        parents
      }
    

    getOrCreateShuffleMapStage

    获取或创建ShuffleMapStage,其中ShuffleMapStage是说所有的Stage都保存在私有属性shuffleIdToMapStage 集合(HashMap)中,我们也可以将shuffleIdToMapStage理解为宽依赖注册中心。
    该方法先从宽依赖注册中心(shuffleIdToMapStage)集合中获取Stage,如果存在则直接返回已存在的Stage,如果不存在则创建新的Stage。

    /**
       * Mapping from shuffle dependency ID to the ShuffleMapStage that will generate the data for
       * that dependency. Only includes stages that are part of currently running job (when the job(s)
       * that require the shuffle stage complete, the mapping will be removed, and the only record of
       * the shuffle data will be in the MapOutputTracker).
       */
      private[scheduler] val shuffleIdToMapStage = new HashMap[Int, ShuffleMapStage]
    

    源码实现
    从没有父宽依赖的Stage开始创建,然后对当前宽依赖创建Stage。

    /**
       * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
       * shuffle map stage doesn't already exist, this method will create the shuffle map stage in
       * addition to any missing ancestor shuffle map stages.
       */
      private def getOrCreateShuffleMapStage(
          shuffleDep: ShuffleDependency[_, _, _],
          firstJobId: Int): ShuffleMapStage = {
        shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
          case Some(stage) =>
            stage
    
          case None =>
            // Create stages for all missing ancestor shuffle dependencies.
            getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
              // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
              // that were not already in shuffleIdToMapStage, it's possible that by the time we
              // get to a particular dependency in the foreach loop, it's been added to
              // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
              // SPARK-13902 for more information.
              if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
                createShuffleMapStage(dep, firstJobId)
              }
            }
            // Finally, create a stage for the given shuffle dependency.
            createShuffleMapStage(shuffleDep, firstJobId)
        }
      }
    

    getMissingAncestorShuffleDependencies

    获得所有没有父宽依赖的宽依赖。
    算法实现上依然采用数组栈(ArrayStack),将宽依赖注册到宽依赖注册中心(shuffleIdToMapStage)中。

    /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
      private def getMissingAncestorShuffleDependencies(
          rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
        val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
        val visited = new HashSet[RDD[_]]
        // We are manually maintaining a stack here to prevent StackOverflowError
        // caused by recursively visiting
        val waitingForVisit = new ArrayStack[RDD[_]]
        waitingForVisit.push(rdd)
        while (waitingForVisit.nonEmpty) {
          val toVisit = waitingForVisit.pop()
          if (!visited(toVisit)) {
            visited += toVisit
            getShuffleDependencies(toVisit).foreach { shuffleDep =>
              if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
                ancestors.push(shuffleDep)
                waitingForVisit.push(shuffleDep.rdd)
              } // Otherwise, the dependency and its ancestors have already been registered.
            }
          }
        }
        ancestors
      }
    

    DAGScheduler的创建

    DAGScheduler何时创建的呢?

    DAGScheduler是在SparkContext中创建的。

    // _dagScheduler 是SparkContext的私有属性
    @volatile private var _dagScheduler: DAGScheduler = _
    ...
    _dagScheduler = new DAGScheduler(this)
    ...
    

    DAGScheduler何时调用的呢?

    其实可以想到,谁创建谁调用。DAGScheduler的调用是在SparkContext的runJob方法中调用的。

    /**
       * Run a function on a given set of partitions in an RDD and pass the results to the given
       * handler function. This is the main entry point for all actions in Spark.
       *
       * @param rdd target RDD to run tasks on
       * @param func a function to run on each partition of the RDD
       * @param partitions set of partitions to run on; some jobs may not want to compute on all
       * partitions of the target RDD, e.g. for operations like `first()`
       * @param resultHandler callback to pass each result to
       */
      def runJob[T, U: ClassTag](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          resultHandler: (Int, U) => Unit): Unit = {
        if (stopped.get()) {
          throw new IllegalStateException("SparkContext has been shutdown")
        }
        val callSite = getCallSite
        val cleanedFunc = clean(func)
        logInfo("Starting job: " + callSite.shortForm)
        if (conf.getBoolean("spark.logLineage", false)) {
          logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
        }
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        progressBar.foreach(_.finishAll())
        rdd.doCheckpoint()
      }
    

    DAGScheduler调用链

    sc: SparkContext
    ds: DAGScheduler
    eventProcessLoop: DAGSchedulerEventProcessLoop
    sc.runJob ----> ds.runJob ----> ds.submitJob ----> eventProcessLoop.post(JobSubmitted) ----> eventProcessLoop.onReceive ----> eventProcessLoop.doOnReceive ----> ds.handleJobSubmitted

    handleJobSubmitted
    这里才是重点,在handleJobSubmitted中,完成了ResultStage的创建,Job的创建,然后提交Stage(submitStage)。
    下面是handleJobSubmitted的伪代码。

    private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          callSite: CallSite,
          listener: JobListener,
          properties: Properties) {
        var finalStage: ResultStage = null
      
        // 创建输出结果Stage
        finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
        ...
        // 下面省略了n行代码
        ...
        //创建活动的Job
        val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
        ...
        // 下面省略了n行代码
        ...
        val jobSubmissionTime = clock.getTimeMillis()
        // 注册活动Job到map中(jobId,job)
        jobIdToActiveJob(jobId) = job
        // 注册Job到Set中
        activeJobs += job
        // 注册job到输出finalStage中
        finalStage.setActiveJob(job)
        // 获得当前job对应的所有stageId
        val stageIds = jobIdToStageIds(jobId).toArray
        // 获得所有的Stage信息(stageInfo)
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        // 监听通知
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        // 提交Stage
        submitStage(finalStage)
      }
    

    submitStage(Stage提交)
    先判断job有没有定义。如果job没有定义,则终止stage(调用abortStage方法)遗漏的Stage。
    如果当前stage没有在等待中,执行中或失败的清单中,则继续提交。
    判断有没有遗漏的父Stage。如果没有,则提交当前stage;如果有,则先提交父Stage,并将当前Stage添加到等待的stage集合中(waitingStages)。
    伪代码如下:

      /** Submits stage, but first recursively submits any missing parents. */
      private def submitStage(stage: Stage) {
        val jobId = activeJobForStage(stage)
        // 如果jobId已定义
        if (jobId.isDefined) {
          logDebug("submitStage(" + stage + ")")
          // 如果不在等待中,运行中或失败中,则继续提交
          if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
            // 查询没有父Stage的stage。这里sort排序之后,有序的执行
            val missing = getMissingParentStages(stage).sortBy(_.id)
            logDebug("missing: " + missing)
            // 如果不存在,则提交当前stage;否则提交父stage,并将当前stage添加到等待stage集合中
            if (missing.isEmpty) {
              logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
              // 当父stage全准备就绪了,此时就可以提交当前stage的task
              submitMissingTasks(stage, jobId.get)
            } else {
              for (parent <- missing) {
                // 递归提交stage
                  submitStage(parent)
              }
              waitingStages += stage
            }
          }
        } else {
          // 如果jobId没有定义,终止当前stage执行
          abortStage(stage, "No active job for stage " + stage.id, None)
        }
      }
    

    submitMissingTasks

    Called when stage's parents are available and we can now do its task.
    当stage的父stage是可用的(也就是父stage运行成功),我们现在可以运行它的task的时候调用

    这个方法是重点,难得在spark源码里遇到一个如此长的方法,可想而知提交task的复杂性。

     /** Called when stage's parents are available and we can now do its task. */
      private def submitMissingTasks(stage: Stage, jobId: Int) {
        logDebug("submitMissingTasks(" + stage + ")")
    
        // 首先计算出要计算的分区ID的索引。
        val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
    
        // 从当前stage关联的活动job中,获得使用的调度池,job分组,描述等
        val properties = jobIdToActiveJob(jobId).properties
        // 将当前stage添加到运行中的stage集合中
        runningStages += stage
        // 在测试任务是否可序列化之前,应发布SparkListenersTageSubmitted。如果任务不可序列化,则将发布SparkListenersTageCompleted事件,该事件应始终位于相应的SparkListenersTageSubmitted事件之后。
        stage match {
          case s: ShuffleMapStage =>
            outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
          case s: ResultStage =>
            outputCommitCoordinator.stageStart(
              stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
        }
        // task本地化
        val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
          stage match {
            case s: ShuffleMapStage =>
              partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
            case s: ResultStage =>
              partitionsToCompute.map { id =>
                val p = s.partitions(id)
                (id, getPreferredLocs(stage.rdd, p))
              }.toMap
          }
        } catch {
          case NonFatal(e) =>
            stage.makeNewStageAttempt(partitionsToCompute.size)
            listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
            abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
            runningStages -= stage
            return
        }
        // 通过创建新的stageinfo和新的重试id来创建一个新的重试
        stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
    
        // 如果有需要执行的task,则记录stage提交时间。否则,发布没有提交时间的事件,来表明会跳过当前stage
        if (partitionsToCompute.nonEmpty) {
          stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
        }
        listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
    
        // 或许我们可以保持task二进制在stage中来避免被多次序列化
        // 任务广播的二进制文件,用于将任务task分发给executors。注意,我们广播了RDD的序列化副本,对于每个任务,我们将对其进行反序列化,这意味着每个任务都获得了RDD的不同副本。这在可能修改闭包中引用的对象状态的任务之间提供了更强的隔离。这在Hadoop中是必需的,因为jobconf/configuration对象不是线程安全的。
        var taskBinary: Broadcast[Array[Byte]] = null
        var partitions: Array[Partition] = null
        try {
          // 不同的task,序列化和广播采用不同的方法
          // 对于ShuffleMapTask,序列化和广播使用(rdd, shuffleDep)
          // 对于ResultTask,序列化和广播使用 (rdd, func)
          var taskBinaryBytes: Array[Byte] = null
          // TaskBinaryBytes和分区都受检查点状态的影响。我们需要这种同步,以防另一个并发作业检查这个RDD,所以我们得到两个变量的一致视图。
          RDDCheckpointData.synchronized {
            taskBinaryBytes = stage match {
              case stage: ShuffleMapStage =>
                JavaUtils.bufferToArray(
                  closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
              case stage: ResultStage =>
                JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
            }
    
            partitions = stage.rdd.partitions
          }
          // task二进制字节长度大于告警值,则打印告警日志
          if (taskBinaryBytes.length > TaskSetManager.TASK_SIZE_TO_WARN_KIB * 1024) {
            logWarning(s"Broadcasting large task binary with size " +
              s"${Utils.bytesToString(taskBinaryBytes.length)}")
          }
    
          // task分发,广播task二进制字节码
          taskBinary = sc.broadcast(taskBinaryBytes)
        } catch {
          // In the case of a failure during serialization, abort the stage.
          case e: NotSerializableException =>
            abortStage(stage, "Task not serializable: " + e.toString, Some(e))
            runningStages -= stage
    
            // Abort execution
            return
          case e: Throwable =>
            abortStage(stage, s"Task serialization failed: $e\n${Utils.exceptionString(e)}", Some(e))
            runningStages -= stage
    
            // Abort execution
            return
        }
        // 清除pending的分区,重新更新task
        val tasks: Seq[Task[_]] = try {
          val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
          stage match {
            case stage: ShuffleMapStage =>
              stage.pendingPartitions.clear()
              partitionsToCompute.map { id =>
                val locs = taskIdToLocations(id)
                val part = partitions(id)
                stage.pendingPartitions += id
                new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
                  taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
                  Option(sc.applicationId), sc.applicationAttemptId, stage.rdd.isBarrier())
              }
    
            case stage: ResultStage =>
              partitionsToCompute.map { id =>
                val p: Int = stage.partitions(id)
                val part = partitions(p)
                val locs = taskIdToLocations(id)
                new ResultTask(stage.id, stage.latestInfo.attemptNumber,
                  taskBinary, part, locs, id, properties, serializedTaskMetrics,
                  Option(jobId), Option(sc.applicationId), sc.applicationAttemptId,
                  stage.rdd.isBarrier())
              }
          }
        } catch {
          case NonFatal(e) =>
            abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
            runningStages -= stage
            return
        }
        // 如果有待执行的task,则提交task运行
        if (tasks.size > 0) {
          logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
            s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
          taskScheduler.submitTasks(new TaskSet(
            tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
        } else {
          // 因为我们早先已经发布了SparkListenerStageSubmitted,我们需要标记该stage已经完成,因为没有可运行的task
          markStageAsFinished(stage, None)
    
          stage match {
            case stage: ShuffleMapStage =>
              logDebug(s"Stage ${stage} is actually done; " +
                  s"(available: ${stage.isAvailable}," +
                  s"available outputs: ${stage.numAvailableOutputs}," +
                  s"partitions: ${stage.numPartitions})")
              markMapStageJobsAsFinished(stage)
            case stage : ResultStage =>
              logDebug(s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})")
          }
          // 至此,当前stage的task已经全部运行完成,然后我们提交等待的子stage开始运行
          submitWaitingChildStages(stage)
        }
      }
    

    总结

    本文主要分析了Spark DAG原理,包括Stage的如何构建,什么时候调用。后面重点分析了DAGScheduler的调用链,这其中涉及到了提交一个job都经历了什么。涉及到job的构建,stage的提交,task的创建,task如何选择本地化的分区,task的序列化及广播分发到excutor等等。信息量有点儿大,建议这一块的代码多看几遍。

    相关文章

      网友评论

          本文标题:Spark2.4.0 DAG(DAGScheduler)源码分析

          本文链接:https://www.haomeiwen.com/subject/dhqzmqtx.html