美文网首页Spark源码解读
Spark中Job的提交源码解读

Spark中Job的提交源码解读

作者: lehi | 来源:发表于2016-03-29 20:36 被阅读234次

    版权声明:本文为原创文章,未经允许不得转载。

    Spark程序程序job的运行是通过actions算子触发的,每一个action算子其实是一个runJob方法的运行,详见文章

    SparkContex源码解读(一)http://www.jianshu.com/p/9e75c11a5081

    1.Spark中Job的提交

    以一个简单的runjob为例,源码如下:
    <code>
    def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
    val start = System.nanoTime
    //通过dagScheduler运行job,即将JobSubmitted事件添加到DAGScheduler中的事件执行队列中,并用JobWaiter等待结果的返回
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)详见(1)
    waiter.awaitResult() match {
    case JobSucceeded =>
    logInfo("Job %d finished: %s, took %f s".format
    (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
    case JobFailed(exception: Exception) =>
    logInfo("Job %d failed: %s, took %f s".format
    (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
    // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
    val callerStackTrace = Thread.currentThread().getStackTrace.tail
    exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
    throw exception
    }
    }</code>
    1.submitJob(rdd, func, partitions, callSite, resultHandler, properties)方法如下:
    <code>
    def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
    throw new IllegalArgumentException(
    "Attempting to access a non-existent partition: " + p + ". " +
    "Total number of partitions: " + maxPartitions)
    }
    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
    // Return immediately if the job is running 0 tasks
    //如果job正在运行0个task,那么马上返回
    return new JobWaiter[U](this, jobId, 0, resultHandler)
    }
    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    //将JobSubmitted事件添加到eventProcessLoop中执行,详见(2)
    eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))
    waiter
    }
    </code>

    2.将JobSubmitted事件添加到eventProcessLoop中执行 eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))
    其中,
    (1)JobSubmitted一种DAGScheduler可以处理的事件类型,它的trait DAGSchedulerEvent的一个实现。DAGSchedulerEvent的case子类如下图所示:

    DAGScheduler处理的事件类型.png
    (2)eventProcessLoop的类型是DAGSchedulerEventProcessLoop,它是抽象类EventLoop的子类,该类的源码如下:
    <code>
    private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
    extends EventLoopDAGSchedulerEvent with Logging {
    override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
    doOnReceive(event)
    } finally {
    timerContext.stop()
    }
    }
    private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    //对于JobSubmitted,通过 dagScheduler.handleJobSubmitted方法处理
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
    dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
    case StageCancelled(stageId) =>
    dagScheduler.handleStageCancellation(stageId)
    case JobCancelled(jobId) =>
    dagScheduler.handleJobCancellation(jobId)
    case JobGroupCancelled(groupId) =>
    dagScheduler.handleJobGroupCancelled(groupId)
    case AllJobsCancelled =>
    dagScheduler.doCancelAllJobs()
    case ExecutorAdded(execId, host) =>
    dagScheduler.handleExecutorAdded(execId, host)
    case ExecutorLost(execId) =>
    dagScheduler.handleExecutorLost(execId, fetchFailed = false)
    case BeginEvent(task, taskInfo) =>
    dagScheduler.handleBeginEvent(task, taskInfo)
    case GettingResultEvent(taskInfo) =>
    dagScheduler.handleGetTaskResult(taskInfo)
    case completion @ CompletionEvent(task, reason, , , taskInfo, taskMetrics) =>
    dagScheduler.handleTaskCompletion(completion)
    case TaskSetFailed(taskSet, reason, exception) =>
    dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
    case ResubmitFailedStages =>
    dagScheduler.resubmitFailedStages()
    }
    </code>
    3.对于JobSubmitted事件类型,通过 dagScheduler的handleJobSubmitted方法处理,这个方法中关系涉及到Job的Stage、TaskSet(Tasks)的生成,
    <code>
    private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[
    ],
    func: (TaskContext, Iterator[
    ]) => ,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
    var finalStage: ResultStage = null
    try {
    (1)//根据jobId生成finalStage,我们在后面具体介绍
    (2)Job的提交
    //初始化ActiveJob
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    //清除RDD的位置信息
    clearCacheLocs()
    logInfo("Got job %s (%s) with %d output partitions".format(
    job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))
    val jobSubmissionTime = clock.getTimeMillis()
    jobIdToActiveJob(jobId) = job
    activeJobs += job
    finalStage.resultOfJob = Some(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(
    .latestInfo))
    listenerBus.post(
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    ...
    (3)提交stages,但首先循环提交丢失的父Stage(s),即将丢失的stage加入到waitingStages中
    ...
    ...
    (4)提交Taskset(tasks)
    ...
    }
    </code>
    由代码(2)处我们可以看到SparkListenerJobStart事件加入到了监听器总线LiveListenerBus中,它的父类SparkListenerBus中定义了具体事件及监听器的映射关系,如下所示:
    <code>
    private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkListenerEvent] {
    override def onPostEvent(listener: SparkListener, event: SparkListenerEvent): Unit = {
    event match {
    case stageSubmitted: SparkListenerStageSubmitted =>
    listener.onStageSubmitted(stageSubmitted)
    case stageCompleted: SparkListenerStageCompleted =>
    listener.onStageCompleted(stageCompleted)
    //Job的启动
    case jobStart: SparkListenerJobStart =>
    listener.onJobStart(jobStart)
    case jobEnd: SparkListenerJobEnd =>
    listener.onJobEnd(jobEnd)
    case taskStart: SparkListenerTaskStart =>
    listener.onTaskStart(taskStart)
    case taskGettingResult: SparkListenerTaskGettingResult =>
    listener.onTaskGettingResult(taskGettingResult)
    case taskEnd: SparkListenerTaskEnd =>
    listener.onTaskEnd(taskEnd)
    case environmentUpdate: SparkListenerEnvironmentUpdate =>
    listener.onEnvironmentUpdate(environmentUpdate)
    case blockManagerAdded: SparkListenerBlockManagerAdded =>
    listener.onBlockManagerAdded(blockManagerAdded)
    case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
    listener.onBlockManagerRemoved(blockManagerRemoved)
    case unpersistRDD: SparkListenerUnpersistRDD =>
    listener.onUnpersistRDD(unpersistRDD)
    case applicationStart: SparkListenerApplicationStart =>
    listener.onApplicationStart(applicationStart)
    case applicationEnd: SparkListenerApplicationEnd =>
    listener.onApplicationEnd(applicationEnd)
    case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
    listener.onExecutorMetricsUpdate(metricsUpdate)
    case executorAdded: SparkListenerExecutorAdded =>
    listener.onExecutorAdded(executorAdded)
    case executorRemoved: SparkListenerExecutorRemoved =>
    listener.onExecutorRemoved(executorRemoved)
    case blockUpdated: SparkListenerBlockUpdated =>
    listener.onBlockUpdated(blockUpdated)
    case logStart: SparkListenerLogStart => // ignore event log metadata
    }
    }
    }
    </code>
    4.SparkListenerJobStart 事件最后是由JobProgressListener监听器的onJobStart方法执行的,如下所示:
    <code>
    override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
    val jobGroup = for (
    props <- Option(jobStart.properties);
    group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))//得到属性的值"spark.jobGroup.id"
    ) yield group
    val jobData: JobUIData =
    new JobUIData(
    jobId = jobStart.jobId,
    submissionTime = Option(jobStart.time).filter(_ >= 0),
    stageIds = jobStart.stageIds,
    jobGroup = jobGroup,
    status = JobExecutionStatus.RUNNING)
    // A null jobGroupId is used for jobs that are run without a job group
    jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId)
    jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
    //计算将要运行这个job的的tasks数量,这可能是一个低估因为job start event 引用所有的result stages's的依赖
    jobData.numTasks = {
    val allStages = jobStart.stageInfos
    //过滤掉已经完成的或取消的Stage
    val missingStages = allStages.filter(.completionTime.isEmpty)
    missingStages.map(
    .numTasks).sum
    }
    //存放jobid以及相关的jobData
    jobIdToData(jobStart.jobId) = jobData
    //激活的、将要执行的Jobs
    activeJobs(jobStart.jobId) = jobData
    // 遍历stageIds,更新stageId为key,ActiveJobIds为value的stageIdToActiveJobIds集合
    for (stageId <- jobStart.stageIds) {
    stageIdToActiveJobIds.getOrElseUpdate(stageId, new HashSet[StageId]).add(jobStart.jobId)
    }
    //遍历stageInfos
    for (stageInfo <- jobStart.stageInfos) {
    stageIdToInfo.getOrElseUpdate(stageInfo.stageId, stageInfo)
    stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), new StageUIData)
    }
    }
    </code>
    这样我们就启动了Job,WebUI就可以看到该Job的信息了。

    相关文章

      网友评论

        本文标题:Spark中Job的提交源码解读

        本文链接:https://www.haomeiwen.com/subject/yzhjlttx.html