美文网首页
4.2 DAGScheduler - 形成Job和stage

4.2 DAGScheduler - 形成Job和stage

作者: GongMeng | 来源:发表于2018-11-14 19:40 被阅读0次

    1. 内部维护的重要结构

     // 后续会介绍, 这是维护的metric相关句柄 
     private[spark] val metricsSource: DAGSchedulerSource = new DAGSchedulerSource(this)
     
      // 前文提到过的和job相关的结构
      private[scheduler] val nextJobId = new AtomicInteger(0)
      private[scheduler] def numTotalJobs: Int = nextJobId.get()
      private val nextStageId = new AtomicInteger(0)
    
      // 几个map映射表, 将job到stage, shuffle到stage, job到具体的执行过程也就是activeJob进行映射
      private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
      private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
      private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]
      private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]
    
      // Stages we need to run whose parents aren't done
      private[scheduler] val waitingStages = new HashSet[Stage]
    
      // Stages we are running right now
      private[scheduler] val runningStages = new HashSet[Stage]
    
      // Stages that must be resubmitted due to fetch failures
      private[scheduler] val failedStages = new HashSet[Stage]
    
      private[scheduler] val activeJobs = new HashSet[ActiveJob]
    
      /**
       * Contains the locations that each RDD's partitions are cached on.  This map's keys are RDD ids
       * and its values are arrays indexed by partition numbers. Each array value is the set of
       * locations where that RDD partition is cached.
       *
       * All accesses to this map should be guarded by synchronizing on it (see SPARK-4454).
       */
      private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
    
      // For tracking failed nodes, we use the MapOutputTracker's epoch number, which is sent with
      // every task. When we detect a node failing, we note the current epoch number and failed
      // executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask results.
      //
      // TODO: Garbage collect information about failure epochs when we know there are no more
      //       stray messages to detect.
      private val failedEpoch = new HashMap[String, Long]
    
      private [scheduler] val outputCommitCoordinator = env.outputCommitCoordinator
    
      // A closure serializer that we reuse.
      // This is only safe because DAGScheduler runs in a single thread.
      private val closureSerializer = SparkEnv.get.closureSerializer.newInstance()
    
      /** If enabled, FetchFailed will not cause stage retry, in order to surface the problem. */
      private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)
    
      private val messageScheduler =
        ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
    
       // 这里实现了从DAG的蓝图到taskScheduler具体任务的下刷. 后边会具体介绍
      private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
      taskScheduler.setDAGScheduler(this)
    

    对于EventProcessLoop后边会详细介绍, 这里可以认为这是一个产品经理把任务安排到jira上去执行的过程.从蓝图到执行

    2. submitJob / runJob

    SparkContext把一个任务放到EventPostLoop里, 去实际执行
    这里外层调用时runJob, runJob内会实际执行submitJob方法来跑任务

    SparkContxt的runJob是在RDD里面启动的, 每当RDD执行一个action, 比如rdd.aggragate 其内部代码就会调用sc.runJob()
    这就是我们前文中说过的RDD的所有Action会触发DAGScheduler开始执行任务
    这里有一个非常重要的调优点, 就是尽可能避免对RDD执行forEach操作. 有些Lamda写习惯了的程序员会直接用这个关键字, 这个关键字底层是一个Action

    /**
       * Run an action job on the given RDD and pass all the results to the resultHandler function as
       * they arrive.
       *
       * @param rdd target RDD to run tasks on
       * @param func a function to run on each partition of the RDD
       * @param partitions set of partitions to run on; some jobs may not want to compute on all
       *   partitions of the target RDD, e.g. for operations like first()
       * @param callSite where in the user program this job was called
       * @param resultHandler callback to pass each result to
       * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
       *
       * @throws Exception when the job fails
       */
      def runJob[T, U](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: CallSite,
          resultHandler: (Int, U) => Unit,
          properties: Properties): Unit = {
    
        val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
        waiter.awaitResult() match {
          case JobSucceeded =>
              // 正确处理
          case JobFailed(exception: Exception) =>
              // 错误处理
        }
      }
    
    SubmitJob
    1. 检查被执行的rdd内部的parition是否是合法的
    2. 增加nextJobId
    3. 如果rdd里没有parition, 说明这job没在跑, 直接返回
    4. 有parition的话, 就把jobid, 对应的parition, rdd和具体的操作扔给EventProcessLoop, 并等待结果以JobWaiter的形式返回.

    3. EventProcessJob

    从上文我们可以看到从RDD操作, 到SparkContext submit job到最后运行.
    任务需求, 可以理解为客户最终需求, 进入了一个叫EventProcessJob的结构里.
    这是一个DAGScheduler内部维护的任务队列, 同样是以监听器模型来设计的.

    这里熟悉Python Celery 或者 Apache Storm的朋友可以更容易理解这里DAG干的事情. 当最终任务进入这个管道后, DAG会根据最终需求和RDD里保存的依赖关系一步一步向前反推各种需要执行的任务. 每一步都放到这个管道里. 这个思路非常类似传统流处理系统的pipeline task.
    类似客户最终要产品D, 然后D放到任务列表里, 然后在公司内部喊话: "抓紧把D的前置任务C给我放进去". 重复这个过程, 直到发现没有任何前置任务的"A", 或者恰好已经做过的"B". 无论如何, 经过一轮一轮的喊话, 所有需要执行的任务都被压入到上文提到的Stage/Job结构中. 后续从头开始执行就行了.

      private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
         // 当有一个JOB被提交进来后, 进行处理
        case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
         
        // 一个MapStage提交进来后进行处理
        case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
          dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
        
        //  用于特殊情况-失败情况 处理的
        case StageCancelled(stageId) =>
          dagScheduler.handleStageCancellation(stageId)
        // 用于特殊情况-失败情况 处理的
        case JobCancelled(jobId) =>
          dagScheduler.handleJobCancellation(jobId)
         // 用于特殊情况-失败情况 处理的
        case JobGroupCancelled(groupId) =>
          dagScheduler.handleJobGroupCancelled(groupId)
        // 用于特殊情况-失败情况 处理的
        case AllJobsCancelled =>
          dagScheduler.doCancelAllJobs()
    
        // 把可以跑任务的Executor加入到任务规划里来
        case ExecutorAdded(execId, host) =>
          dagScheduler.handleExecutorAdded(execId, host)
       
     // 干掉那些删库跑路的
        case ExecutorLost(execId) =>
          dagScheduler.handleExecutorLost(execId, fetchFailed = false)
         
        
        case BeginEvent(task, taskInfo) =>
          dagScheduler.handleBeginEvent(task, taskInfo)
        
        // 获取计算结果, 一般是为了获取当前任务依赖的上一级任务的中间结果
        case GettingResultEvent(taskInfo) =>
          dagScheduler.handleGetTaskResult(taskInfo)
    
        case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) =>
          dagScheduler.handleTaskCompletion(completion)
        
         //  任务失败
        case TaskSetFailed(taskSet, reason, exception) =>
          dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
        
        // 错误场景恢复
        case ResubmitFailedStages =>
          dagScheduler.resubmitFailedStages()
      }
    

    4. 从RDD的dependency到一个DAG图

    1. 从上文中我可以看到submitJob后从EventBus里启动JobSubmitted
    2. 从上文中我们知道所有的submitJob是从 RDD Action开始的, 所有的Action会导致一个stage生成. 可以看到一个方法newResultStage
    private[scheduler] def handleJobSubmitted(
    ... ) {
        var finalStage: ResultStage = null
        try {
          // New stage creation may throw an exception if, for example, jobs are run on a
          // HadoopRDD whose underlying HDFS files have been deleted.
          finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
        } catch {
        }
    
    1. 于是我们创建了一个stage, 赋予它stageId, 然后往这个stage里填东西
      /**
       * Create a ResultStage associated with the provided jobId.
       */
      private def newResultStage(
        ... ): ResultStage = {
        val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)
        val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)
        // 生成一个stage, stageid是全局递增量
        stageIdToStage(id) = stage
        // 往这个stage里填job
        updateJobIdStageIdMaps(jobId, stage)
        stage
      }
    
    1. 继而跟踪代码, 我们发现程序试图在创建好这个stage后观察一下它依赖的所有资源是否到位了. 这里调用了getParentStages方法来实现新的stage.
      从这里我们也可以看到JOB内部会切分成多个stage
      这个在spark的web ui上也可以看到
      web ui
      /**
       * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
       * the provided firstJobId.
       */
      private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        val parents = new HashSet[Stage]
        val visited = new HashSet[RDD[_]]
        // We are manually mai  ntaining a stack here to prevent StackOverflowError
        // caused by recursively visiting
        val waitingForVisit = new Stack[RDD[_]]
        def visit(r: RDD[_]) {
          if (!visited(r)) {
            visited += r
            // Kind of ugly: need to register RDDs with the cache here since
            // we can't do it in its constructor because # of partitions is unknown
          
            for (dep <- r.dependencies) {
              dep match {
            //  从RDD的依赖图中一层一层向上找, 所有shuffle操作会触发一个单独的stage的生成
                case shufDep: ShuffleDependency[_, _, _] =>
                  parents += getShuffleMapStage(shufDep, firstJobId)
             
              //  非shuffle操作则直接被压入到任务列表里
                case _ =>
                  waitingForVisit.push(dep.rdd)
              }
            }
          }
        }
        waitingForVisit.push(rdd)
        while (waitingForVisit.nonEmpty) {
          visit(waitingForVisit.pop())
        }
        parents.toList
      }
    
    1. 这个过程在DAGScheduler中还有几个实现的线路, 之后有机会再补充 [TODO]
      • RDD的shuffle操作会变成stage和stage之间的间隔, 每个shuffle操作都会触发一个新的stage的生成.
      • 而非shuffle操作, 则会被压入到工作队列里. 实际上就形成了这个stage以及和它关联的job

    相关文章

      网友评论

          本文标题:4.2 DAGScheduler - 形成Job和stage

          本文链接:https://www.haomeiwen.com/subject/ddmjfqtx.html