美文网首页
spark-源码-action算子触发

spark-源码-action算子触发

作者: scandly | 来源:发表于2018-08-28 22:21 被阅读0次

    基于spark1.6

    创建完SparkContext,然后执行Action算子

    当RDD执行Action算子时(形成一个job),会将代码提交到Master上运行,

    例如wordcount的action 算子 collect方法  def collect(): Array[T] = {

        val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

        Array.concat(results: _*)

      }

    sc是SparkContext对象,上面 runJob 如下

      def runJob[T, U: ClassTag](

          rdd: RDD[T],

          func: (TaskContext, Iterator[T]) => U,

          partitions: Seq[Int],

          allowLocal: Boolean,

          resultHandler: (Int, U) => Unit) {

    ........................

        }

        //该方法调用多次重载的方法后,最终会调用dagScheduler的runJob,形成和切分stage

    def runJob[T, U: ClassTag](

          rdd: RDD[T],

          func: (TaskContext, Iterator[T]) => U,

          partitions: Seq[Int],

          allowLocal: Boolean,

          resultHandler: (Int, U) => Unit) {

      。。。。。。。

        //dagScheduler出现了,可以切分stage

        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,

          resultHandler, localProperties.get)

        progressBar.foreach(_.finishAll())

        rdd.doCheckpoint()

      }

    dagScheduler的runJob 是我们比较关心的

    def runJob[T, U: ClassTag](

        。。。。。

        val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)

      }

    这里面的我们主要看的是submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)提交任务,括号里面的是任务信息

    def submitJob[T, U](。。。): JobWaiter[U] = {

        //在这儿才封装任务提交事件,把该事件对象加入到任务队列里面    eventProcessLoop.post(JobSubmitted(

          jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))

      }

    JobSubmitted:// 封装job事件对象,放入DAGScheduler阻塞的事件队列,例如:任务id,数据RDD,fun,jobId(可见一个action就是一个job)

    从队列中取出事件对象,调用 onReceive方法,即调用子类 DAGSchedulerEventProcessLoop 的onReceive方法,该方法的匹配模式如下:

    (1)先生成finalStage。

        case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>

          //调用dagScheduler来出来提交任务

          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties)

    调用了handleJobSubmitted方法,接下来查看该方法

    private[scheduler] def handleJobSubmitted(jobId: Int,

          finalRDD: RDD[_],

          func: (TaskContext, Iterator[_]) => _,

          partitions: Array[Int],

          allowLocal: Boolean,

          callSite: CallSite,      listener: JobListener,

          properties: Properties) {

            var finalStage: Stage = null

            //最终的stage,从后往前划分

            finalStage = newResultStage(finalRDD, partitions.size, None, jobId, callSite)

            。。。。

            submitStage(finalStage)

      // 提交其他正在等待的stage  

       submitWaitingStages() }

    }

    /**  

       * 创建一个 ResultStage ,形成有向无环图

       */  

      private def newResultStage(  

    rdd: RDD[_],

          func: (TaskContext, Iterator[_]) => _,  

          partitions: Array[Int],  

          jobId: Int,  

          callSite: CallSite): ResultStage = {  

     //下面这个函数会生成我们的DAG,需重点关注  

        val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)  

    val stage = new ResultStage(id,rdd, func, partitions,parentStages, jobId, callSite)

        stageIdToStage(id) = stage //将Stage的id放入stageIdToStage结构中。  

        updateJobIdStageIdMaps(jobId, stage) //更新JobIdStageIdMaps  

        stage  

     }

      } 

    上面的代码中,调用了newResultStage方法,该方法是划分任务的核心方法,任务划分是根据最后一个依赖关系作为开始,通过递归,将每个宽依赖做为切分Stage的依据,切分Stage的过程是流程中的一环(详见 day29_spark-源码-Stage划分算法,并最终得到了DAG图中的Result Stage(final Stage)),但在这里不详细阐述,当任务切分完毕后,代码继续执行来到submitStage(finalStage)这里开始进行任务提交

    (2)提交resultStage

    //提交Stage,如果有未提交的ParentStage,则会递归提交这些ParentStage,只有所有ParentStage都计算完了,才能提交当前Stage 

      private def submitStage(stage: Stage) { // 此stage是 result stage

      // 根据stage获取jobId  

        val jobId = activeJobForStage(stage)  //查找该Stage的所有激活的job

      if (jobId.isDefined) {  // jobId 存在就执行,如果不存在就停止  

    // 记录Debug日志信息:submitStage(stage)  

          logDebug("submitStage(" + stage + ")") 

        //如果当前Stage没有在等待parent Stage的返回,也不是正在运行的Stage,并且也没有提 示提交失败,说明未处理,那么我们就尝试提交Stage 

          if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) { 

            //得到还未执行的父 stage

    val missing = getMissingParentStages(stage).sortBy(_.id) 

            logDebug("missing: " + missing) 

            if (missing.isEmpty) {    //如果没有父 Stage 

            //当前stage 拆分成task,形成taskSet 并提交

              submitMissingTasks(stage, jobId.get) // 注意这个stage会是两种类型 1、shufflerMapStage 2、resultStage

                } else { 

              //有父Stage没进行计算,就递归提交这些父Stage 

              for (parent <- missing) {  // 该stage的所有父stage

                submitStage(parent)// 递归调用本身 

        } 

                waitingStages += stage 

            } 

          } 

        } else {//无效作业,停止它。 

          abortStage(stage, "No active job for stage " + stage.id, None) 

        } 

    }

    ********************getMissingParentStages方法如下****************

    针对 stage的执行要记住2个判断点 1、getmissingParentStages()方法为核心方法。这里我们要懂得这样一个逻辑:我们都知道,Stage是通过shuffle划分的,所以,每一Stage都是以shuffle开始的,若一个RDD是宽依赖,则必然说明该RDD的父RDD在另一个Stage中,若一个RDD是窄依赖,则该RDD所依赖的父RDD还在同一个Stage中,我们可以根据这个逻辑,找到该Stage的父Stage。

    // DAGScheduler.scala

    private def getMissingParentStages(stage: Stage): List[Stage] = {

        val missing = new HashSet[Stage] //用于存放父Stage

        val visited = new HashSet[RDD[_]] //用于存放已访问过的RDD

        val waitingForVisit = new Stack[RDD[_]]

        def visit(rdd: RDD[_]) {

          if (!visited(rdd)) { //如果RDD没有被访问过,则进行访问

            visited += rdd //添加到已访问RDD的HashSet中

            val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)

            if (rddHasUncachedPartitions) {

              for (dep <- rdd.dependencies) { //获取该RDD的依赖

                dep match {

                  case shufDep: ShuffleDependency[_, _, _] =>//若为宽依赖,则该RDD依赖的RDD所在的stage必为父stage

                    val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)//生成父Stage

                    if (!mapStage.isAvailable) {//若父Stage task没有完全执行,则添加到父Stage的HashSET中

                      missing += mapStage // 如果是宽依赖,那么就表示找到了,不存在宽依赖前还有宽依赖

                    }

                  case narrowDep: NarrowDependency[_] =>//若为窄依赖,则需要再判断,其父有无宽依赖

                    waitingForVisit.push(narrowDep.rdd)

                }

              }

            }

          }

        }

        waitingForVisit.push(stage.rdd)

        while (waitingForVisit.nonEmpty) {//循环遍历所有RDD

          visit(waitingForVisit.pop())

        }

        missing.toList

    }

    def isAvailable: Boolean = _numAvailableOutputs == numPartitions

    针对 stage的执行要记住2个判断点 2、每当执行完一个Task会对变量_numAvailableOutputs加1,直至所有Task执行完,_numAvailableOutputs等于分区数。

    (3)提交MissingTask

    stage根据 parition 拆分成task(决定每个Task的最佳位置)生成TaskSet,并提交到TaskScheduler

    private def submitMissingTasks(stage: Stage, jobId: Int) {

      //首先根据stage所依赖的RDD的partition的分布,会产生出与partition数量相等的task

      var tasks = ArrayBuffer[Task[_]]()

      //对于resultStage或是shufflerMapStage会产生不同的task。

      //检查该stage时是否ShuffleMapStage,如果是则生成ShuffleMapTask

      if (stage.isShuffleMapStage) { //生成ShuffleMapStage

        for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {

          //task根据partition的locality进行分布

          val locs = getPreferredLocs(stage.rdd, p)

          tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)

        }

      } else { //resultStage:该类型stage直接输出结果生成ResultTask

      val job = resultStageToJob(stage)

        for (id <- 0 until job.numPartitions if !job.finished(id)) {

          val partition = job.partitions(id)

          val locs = getPreferredLocs(stage.rdd, partition)

          //由于是ResultTask,因此需要传入定义的func,也就是处理结果返回

          tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)

        }

      }

      //向TaskSchuduler提交任务,以stage为单位,一个stage对应一个TaskSet

      taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))

    }

    Task任务调度

    taskScheduler.submitTasks方法比较重要,主要将任务加入调度池(taskschduler 创建时初始一个调度池),最后调用了CoarseGrainedSchedulerBackend.reviveOffers()

    override def submitTasks(taskSet: TaskSet) {

        val tasks = taskSet.tasks

        this.synchronized {

            //将TaskSet 封装成TaskSetManger      val manager = createTaskSetManager(taskSet, maxTaskFailures)

          activeTaskSets(taskSet.id) = manager

        //用 schedulableBuilder去添加TaskSetManager到队列中

    //schedulableBuilder有两种形态:FIFOSchedulableBuilder: 单一pool ,FairSchedulableBuilder:   多个pool      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    。。。。。。。。。。。

        //在TaskSchedulerImpl在submitTasks添加TaskSetManager到pool后,调用了    backend.reviveOffers()     

    //fifo 直接将可调度对象TaskSetManager加入SchedulerQueue的尾端。 

    override def addSchedulable(schedulable: Schedulable) { 

        require(schedulable != null) 

        schedulableQueue.add(schedulable) 

        schedulableNameToSchedulable.put(schedulable.name, schedulable) 

        schedulable.parent = this 

      }

      override def reviveOffers() {

        //自己给自己发消息(告诉它我要提交task)

        driverActor ! ReviveOffers  }

    这里用了内部的DriverActor对象发送了一个内部消息给自己,接下来查看receiver方法接受的消息

    收到消息后调用了makeOffers()方法

          case ReviveOffers =>

            makeOffers()

      def makeOffers() {

          launchTasks(scheduler.resourceOffers(executorDataMap.map {

      case (id, executorData) =>

            new WorkerOffer(id, executorData.executorHost, executorData.freeCores)

          }.toSeq))

        }

    makeOffers方法中,将Executor的信息集合与调度池中的Tasks封装成WokerOffers,调用

    launchTasks

        def launchTasks(tasks: Seq[Seq[TaskDescription]]) {

          for (task <- tasks.flatten) {

            。。。。。。

            //把task序列化

            val serializedTask = ser.serialize(task)

                。。。。。

    //向executor进程 发送创建TaskRunner(extends Runnable)

              val executorData = executorDataMap(task.executorId)(这是之前注册过了的)

              executorData.freeCores -= scheduler.CPUS_PER_TASK

              //把序列化好的task发送给Executor

    executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))         

            }

          }

        }

    会由CoarseGrainedSchedulerBackend来接受执行指令,内部封装DriverActor

    launchTasks方法将遍历Tasks集合,每个Task任务序列化,发送启动Task执行消息的给Executor

    相关文章

      网友评论

          本文标题:spark-源码-action算子触发

          本文链接:https://www.haomeiwen.com/subject/mcjcwftx.html