美文网首页大数据
DAGScheduler之Job的提交划分Stage

DAGScheduler之Job的提交划分Stage

作者: 阿武z | 来源:发表于2018-07-27 12:45 被阅读53次

    整体流程图

    流程图

    源码分析 spark 2.3

    getOrCreateParentStages 创建所有祖先Stage
    /**
       * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
       * the provided firstJobId.
       */
      private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        // getShuffleDependencies 获取RDD的第一层直接宽依赖
        getShuffleDependencies(rdd).map { shuffleDep =>
          //getOrCreateShuffleMapStage 创建rdd对应的所有祖先Stage
          getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
      }
    
    getShuffleDependencies 获取RDD的第一层直接宽依赖
      /**
       * Returns shuffle dependencies that are immediate parents of the given RDD.
       *
       * This function will not return more distant ancestors.  For example, if C has a shuffle
       * dependency on B which has a shuffle dependency on A:
       *
       * A <-- B <-- C
       *
       * calling this function with rdd C will only return the B <-- C dependency.
       *
       * This function is scheduler-visible for the purpose of unit testing.
       */
      private[scheduler] def getShuffleDependencies(
          rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
        val parents = new HashSet[ShuffleDependency[_, _, _]]
        val visited = new HashSet[RDD[_]]
        val waitingForVisit = new ArrayStack[RDD[_]]
        waitingForVisit.push(rdd)
        while (waitingForVisit.nonEmpty) {
          val toVisit = waitingForVisit.pop()
          if (!visited(toVisit)) {
            visited += toVisit
            toVisit.dependencies.foreach {
              // 返回 所有的第一层宽依赖
              case shuffleDep: ShuffleDependency[_, _, _] =>
                parents += shuffleDep
              case dependency =>
                waitingForVisit.push(dependency.rdd)
            }
          }
        }
        parents
      }
    
    getOrCreateShuffleMapStage 创建rdd对应的所有祖先Stage
    /**
       * Gets a shuffle map stage if one exists in shuffleIdToMapStage. Otherwise, if the
       * shuffle map stage doesn't already exist, this method will create the shuffle map stage in
       * addition to any missing ancestor shuffle map stages.
       */
      private def getOrCreateShuffleMapStage(
          shuffleDep: ShuffleDependency[_, _, _],
          firstJobId: Int): ShuffleMapStage = {
        shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
          case Some(stage) =>
            stage
    
          case None =>
            // Create stages for all missing ancestor shuffle dependencies.
            // 深度遍历获取所有祖先宽依赖,按照祖先->子辈的顺序 处理宽依赖
            getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
              // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
              // that were not already in shuffleIdToMapStage, it's possible that by the time we
              // get to a particular dependency in the foreach loop, it's been added to
              // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
              // SPARK-13902 for more information.
              if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
                // 创建宽依赖
                createShuffleMapStage(dep, firstJobId)
              }
            }
            // Finally, create a stage for the given shuffle dependency.
            createShuffleMapStage(shuffleDep, firstJobId)
        }
      }
    
    getMissingAncestorShuffleDependencies 深度遍历获取所有祖先宽依赖
    /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
      private def getMissingAncestorShuffleDependencies(
          rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
        val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
        val visited = new HashSet[RDD[_]]
        // We are manually maintaining a stack here to prevent StackOverflowError
        // caused by recursively visiting
        val waitingForVisit = new ArrayStack[RDD[_]]
        waitingForVisit.push(rdd)
        while (waitingForVisit.nonEmpty) {
          val toVisit = waitingForVisit.pop()
          if (!visited(toVisit)) {
            visited += toVisit
            getShuffleDependencies(toVisit).foreach { shuffleDep =>
              if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
               // 子辈宽依赖先压栈
                ancestors.push(shuffleDep)
                waitingForVisit.push(shuffleDep.rdd)
              } // Otherwise, the dependency and its ancestors have already been registered.
            }
          }
        }
        // 返回宽依赖 堆栈
        ancestors
      }
    

    例子

    RDDs原始依赖图

    RDDs原始依赖图

    getShuffleDependencies

    获取RDD的第一层直接宽依赖

    getMissingAncestorShuffleDependencies

    深度遍历顺序获取所有祖先的宽依赖

    最后划分结果

    最后划分结果

    相关文章

      网友评论

        本文标题:DAGScheduler之Job的提交划分Stage

        本文链接:https://www.haomeiwen.com/subject/xbywmftx.html