美文网首页
【Spark】Spark作业执行原理--提交作业

【Spark】Spark作业执行原理--提交作业

作者: w1992wishes | 来源:发表于2018-12-13 11:26 被阅读6次

    本篇结构:

    • 前言
    • 提交作业

    一、前言

    Spark 的作业和任务调度系统是其核心,它能够有效进行调度的原因是对任务划分 DAG 和容错。

    想真正搞懂 Spark,查看 Spark 作业执行流程的源码必不可少。本篇主要介绍 Spark 作业执行原理--提交作业。

    二、提交作业

    Spark 算子有 transformation 和 action 两种,transformation 算子是懒操作,不会触发提交作业,action 算子才会触发作业的提交。

    以 count 算子为例,在 RDD 的 count 方法中触发 SparkContext 的 runJob 来提交作业,这个提交是在 runJob 方法内部进行的,用户不用显式提交。

    def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
    

    SparkContext 的 runJob 经过多次调用,会触发 DAGScheduler 的 runJob:

      /**
       * Run an action job on the given RDD and pass all the results to the resultHandler function as
       * they arrive.
       *
       * @param rdd target RDD to run tasks on
       * @param func a function to run on each partition of the RDD
       * @param partitions set of partitions to run on; some jobs may not want to compute on all
       *   partitions of the target RDD, e.g. for operations like first()
       * @param callSite where in the user program this job was called
       * @param resultHandler callback to pass each result to
       * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
       *
       * @throws Exception when the job fails
       */
      def runJob[T, U](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: CallSite,
          resultHandler: (Int, U) => Unit,
          properties: Properties): Unit = {
        val start = System.nanoTime
        val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
        // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
        // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
        // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
        // safe to pass in null here. For more detail, see SPARK-13747.
        val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
        waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
        waiter.completionFuture.value.get match {
          case scala.util.Success(_) =>
            logInfo("Job %d finished: %s, took %f s".format
              (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
          case scala.util.Failure(exception) =>
            logInfo("Job %d failed: %s, took %f s".format
              (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
            // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
            val callerStackTrace = Thread.currentThread().getStackTrace.tail
            exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
            throw exception
        }
      }
    

    这一段代码主要关注 submitJob,顾名思义,该方法用于提交 job:

      /**
       * Submit an action job to the scheduler.
       *
       * @param rdd target RDD to run tasks on
       * @param func a function to run on each partition of the RDD
       * @param partitions set of partitions to run on; some jobs may not want to compute on all
       *   partitions of the target RDD, e.g. for operations like first()
       * @param callSite where in the user program this job was called
       * @param resultHandler callback to pass each result to
       * @param properties scheduler properties to attach to this job, e.g. fair scheduler pool name
       *
       * @return a JobWaiter object that can be used to block until the job finishes executing
       *         or can be used to cancel the job.
       *
       * @throws IllegalArgumentException when partitions ids are illegal
       */
      def submitJob[T, U](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: CallSite,
          resultHandler: (Int, U) => Unit,
          properties: Properties): JobWaiter[U] = {
        // Check to make sure we are not launching a task on a partition that does not exist.
        val maxPartitions = rdd.partitions.length
        partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
          throw new IllegalArgumentException(
            "Attempting to access a non-existent partition: " + p + ". " +
              "Total number of partitions: " + maxPartitions)
        }
    
        val jobId = nextJobId.getAndIncrement()
        if (partitions.size == 0) {
          // Return immediately if the job is running 0 tasks
          return new JobWaiter[U](this, jobId, 0, resultHandler)
        }
    
        assert(partitions.size > 0)
        val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
        val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
        eventProcessLoop.post(JobSubmitted(
          jobId, rdd, func2, partitions.toArray, callSite, waiter,
          SerializationUtils.clone(properties)))
        waiter
      }
    

    在 DAGScheduler #submitJob 中,先生成一个 JobWaiter,该 JobWaiter 等待 DAGScheduler job 完成,当任务执行完毕,该 JobWaiter 会将结果传递给给定的 handle function。

    生成 JobWaiter 后,将它传递给 DAGScheduler 的内部类 DAGSchedulerEventProcessLoop 进行处理。这是典型的生产消费者模式,生产者将 JobSubmitted 作业提交的消息放入队列,然后另外的线程从队列中取出消息进行消费。

      private val eventThread = new Thread(name) {
        setDaemon(true)
    
        override def run(): Unit = {
          try {
            while (!stopped.get) {
              val event = eventQueue.take()
              try {
                onReceive(event)
              } catch {
                case NonFatal(e) =>
                  try {
                    onError(e)
                  } catch {
                    case NonFatal(e) => logError("Unexpected error in " + name, e)
                  }
              }
            }
          } catch {
            case ie: InterruptedException => // exit even if eventQueue is not empty
            case NonFatal(e) => logError("Unexpected error in " + name, e)
          }
        }
    
      }
    

    如上 EventLoop 中的消费线程,它循环从队列中取出消息,交给 onReceive 方法消费, onReceive 在 DAGSchedulerEventProcessLoop 中的实现如下:

      /**
       * The main event loop of the DAG scheduler.
       */
      override def onReceive(event: DAGSchedulerEvent): Unit = {
        val timerContext = timer.time()
        try {
          doOnReceive(event)
        } finally {
          timerContext.stop()
        }
      }
    
      private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
        case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    
        case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
          dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    
        case StageCancelled(stageId) =>
          dagScheduler.handleStageCancellation(stageId)
    
        case JobCancelled(jobId) =>
          dagScheduler.handleJobCancellation(jobId)
    
        case JobGroupCancelled(groupId) =>
          dagScheduler.handleJobGroupCancelled(groupId)
    
        case AllJobsCancelled =>
          dagScheduler.doCancelAllJobs()
    
        case ExecutorAdded(execId, host) =>
          dagScheduler.handleExecutorAdded(execId, host)
    
        case ExecutorLost(execId, reason) =>
          val filesLost = reason match {
            case SlaveLost(_, true) => true
            case _ => false
          }
          dagScheduler.handleExecutorLost(execId, filesLost)
    
        case BeginEvent(task, taskInfo) =>
          dagScheduler.handleBeginEvent(task, taskInfo)
    
        case GettingResultEvent(taskInfo) =>
          dagScheduler.handleGetTaskResult(taskInfo)
    
        case completion: CompletionEvent =>
          dagScheduler.handleTaskCompletion(completion)
    
        case TaskSetFailed(taskSet, reason, exception) =>
          dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
    
        case ResubmitFailedStages =>
          dagScheduler.resubmitFailedStages()
      }
    
    

    对于作业提交的消息,匹配到 dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) 方法:

      private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          callSite: CallSite,
          listener: JobListener,
          properties: Properties) {
        var finalStage: ResultStage = null
        try {
          // New stage creation may throw an exception if, for example, jobs are run on a
          // HadoopRDD whose underlying HDFS files have been deleted.
          finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
        } catch {
          case e: Exception =>
            logWarning("Creating new stage failed due to exception - job: " + jobId, e)
            listener.jobFailed(e)
            return
        }
    
        val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
        clearCacheLocs()
        logInfo("Got job %s (%s) with %d output partitions".format(
          job.jobId, callSite.shortForm, partitions.length))
        logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
        logInfo("Parents of final stage: " + finalStage.parents)
        logInfo("Missing parents: " + getMissingParentStages(finalStage))
    
        val jobSubmissionTime = clock.getTimeMillis()
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.setActiveJob(job)
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage)
      }
    

    handleJobSubmitted 方法中将进行 stage 的划分,并且提交调度阶段,将在后面的章节介绍。

    相关文章

      网友评论

          本文标题:【Spark】Spark作业执行原理--提交作业

          本文链接:https://www.haomeiwen.com/subject/inuvhqtx.html