美文网首页spark
stage划分-源码分析

stage划分-源码分析

作者: 专职掏大粪 | 来源:发表于2021-09-20 09:57 被阅读0次
    def collect(): Array[T] = withScope {
     //这里的this是当前rdd(调用action算子的rdd),后面会有传递
      val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
      Array.concat(results: _*)
    }
    
    //这里的this是当前rdd(调用action算子的rdd),后面会有传递
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    

    eventProcessLoop POST JobSubmitted 事件

     eventProcessLoop.post(JobSubmitted(
    //这里的this是当前rdd(调用action算子的rdd),后面会有传递
          jobId, rdd, func2, rdd.partitions.indices.toArray, callSite, listener,
          clonedProperties))
        listener.awaitResult()
    

    eventThread 消费事件进行处理

    private[spark] val eventThread = new Thread(name) {
      override def run(): Unit = {
        try {
          while (!stopped.get) {
            val event = eventQueue.take()
            try {
              onReceive(event)
         ... ...
        }
      }
    
    }
    

    doOnReceive

    private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
        case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    

    DAGScheduler.handleJobSubmitted 核心代码

     private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          callSite: CallSite,
          listener: JobListener,
          properties: Properties): Unit = {
        var finalStage: ResultStage = null
        try {
          //创建ResultStage
          finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    

    DAGScheduler.createResultStage

    private def createResultStage(
          rdd: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          jobId: Int,
          callSite: CallSite): ResultStage = {
        val (shuffleDeps, resourceProfiles) = 
    //获取rdd 的ShuffleDependencies
    getShuffleDependenciesAndResourceProfiles(rdd)
      ... ...
       //创建parent stage
        val parents = getOrCreateParentStages(shuffleDeps, jobId)
        val id = nextStageId.getAndIncrement()
      //创建ResultStage,//这里的rdd(调用action算子的rdd)
        val stage = new ResultStage(id, rdd, func, partitions, parents, jobId,
          callSite, resourceProfile.id)
     ... ...
        stage
      }
    
    private[scheduler] def getShuffleDependenciesAndResourceProfiles(
         rdd: RDD[_]): (HashSet[ShuffleDependency[_, _, _]], HashSet[ResourceProfile]) = {
       val parents = new HashSet[ShuffleDependency[_, _, _]]
       val resourceProfiles = new HashSet[ResourceProfile]
       val visited = new HashSet[RDD[_]]
       val waitingForVisit = new ListBuffer[RDD[_]]
       waitingForVisit += rdd
       while (waitingForVisit.nonEmpty) {
         val toVisit = waitingForVisit.remove(0)
         if (!visited(toVisit)) {
           visited += toVisit
           Option(toVisit.getResourceProfile).foreach(resourceProfiles += _)
           toVisit.dependencies.foreach {
              //如果rdd的Dependency 是ShuffleDependency类型就放入Dependencies返回
             case shuffleDep: ShuffleDependency[_, _, _] =>
               parents += shuffleDep
             case dependency =>
               waitingForVisit.prepend(dependency.rdd)
           }
         }
       }
       (parents, resourceProfiles)
     }
    

    DAGScheduler.getOrCreateParentStages

    private def getOrCreateParentStages(shuffleDeps: HashSet[ShuffleDependency[_, _, _]],
          firstJobId: Int): List[Stage] = {
        //遍历shuffleDeps 对每一个shuffleDep创建ShuffleMapStage
        .map { shuffleDep =>
          getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
      }
    

    DAGScheduler.createShuffleMapStage

      def createShuffleMapStage[K, V, C](
          shuffleDep: ShuffleDependency[K, V, C], jobId: Int): ShuffleMapStage = {
        val rdd = shuffleDep.rdd
        val (shuffleDeps, resourceProfiles) = getShuffleDependenciesAndResourceProfiles(rdd)
       ... ...
        val numTasks = rdd.partitions.length
       //创建依赖的parent stage
        val parents = getOrCreateParentStages(shuffleDeps, jobId)
        val id = nextStageId.getAndIncrement()
        //穿建mapstage
        val stage = new ShuffleMapStage(
          id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker,
          resourceProfile.id)
        stageIdToStage(id) = stage
    ... ...
        }
        stage
      }
    

    相关文章

      网友评论

        本文标题:stage划分-源码分析

        本文链接:https://www.haomeiwen.com/subject/lzemgltx.html