美文网首页
spark DAGSchedulerEventProcessLo

spark DAGSchedulerEventProcessLo

作者: Entry_1 | 来源:发表于2020-06-15 15:22 被阅读0次

    父类EventLoop起了一个Thread,监听从LinkedBlockingDeque中获取event,然后用onReceive接收执行,DAGSchedulerEventProcessLoop类中onReceive方法调用了doOnReceive,具体判断事件的类别,并进行处理。

    private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
      extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
    
      private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer
    
      /**
       * The main event loop of the DAG scheduler.
       */
      override def onReceive(event: DAGSchedulerEvent): Unit = {
        val timerContext = timer.time()
        try {
          doOnReceive(event)
        } finally {
          timerContext.stop()
        }
      }
    
      private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    // 处理job提交任务
        case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
          dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
      //处理map提交的stage任务
        case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
          dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    //处理map stage 取消的任务
        case StageCancelled(stageId, reason) =>
          dagScheduler.handleStageCancellation(stageId, reason)
    //处理job 取消的任务
        case JobCancelled(jobId, reason) =>
          dagScheduler.handleJobCancellation(jobId, reason)
    //处理job 组取消的任务
        case JobGroupCancelled(groupId) =>
          dagScheduler.handleJobGroupCancelled(groupId)
    //处理所有job 取消的任务
        case AllJobsCancelled =>
          dagScheduler.doCancelAllJobs()
    //处理executort完成分配的事件
        case ExecutorAdded(execId, host) =>
          dagScheduler.handleExecutorAdded(execId, host)
    //Executor丢失
        case ExecutorLost(execId, reason) =>
          val filesLost = reason match {
            case SlaveLost(_, true) => true
            case _ => false
          }
          dagScheduler.handleExecutorLost(execId, filesLost)
    //开始task
        case BeginEvent(task, taskInfo) =>
          dagScheduler.handleBeginEvent(task, taskInfo)
    //获取task结果
        case GettingResultEvent(taskInfo) =>
          dagScheduler.handleGetTaskResult(taskInfo)
    //事件完成
        case completion: CompletionEvent =>
          dagScheduler.handleTaskCompletion(completion)
    //task失败
        case TaskSetFailed(taskSet, reason, exception) =>
          dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
    //重复提交
        case ResubmitFailedStages =>
          dagScheduler.resubmitFailedStages()
      }
    
      override def onError(e: Throwable): Unit = {
        logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
        try {
          dagScheduler.doCancelAllJobs()
        } catch {
          case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
        }
        dagScheduler.sc.stopInNewThread()
      }
    
      override def onStop(): Unit = {
        // Cancel any active jobs in postStop hook
        dagScheduler.cleanUpAfterSchedulerStop()
      }
    }

    相关文章

      网友评论

          本文标题:spark DAGSchedulerEventProcessLo

          本文链接:https://www.haomeiwen.com/subject/ujthxktx.html