[Spark源码剖析] JobWaiter

作者: 牛肉圆粉不加葱 | 来源:发表于2015-10-20 08:50 被阅读331次

    职责

    • 等待DAGScheduler job完成,一个JobWaiter对象与一个job唯一一一对应
    • 一旦task完成,将该task结果填充到SparkContext.runJob创建的results数组中

    构造函数

    private[spark] class JobWaiter[T](
        dagScheduler: DAGScheduler,
        val jobId: Int,
        totalTasks: Int,
        resultHandler: (Int, T) => Unit)
      extends JobListener {...}
    

    在SparkContext.runJob中,通过

    val results = new Array[U](partitions.size)
    runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
    

    来创建容纳job结果的数据,数组的每个元素对应与之下标相等的partition的计算结果;并将结果处理函数(index, res) => results(index) = res作为参数传入runJob,以使在runJob内部的创建的JobWaiter对象能够在得知taskSucceeded之后,将该task的结果填充到results中

    重要成员及方法

    private var finishedTasks = 0
    

    已经完成的task个数


    private var jobResult: JobResult = if (jobFinished) JobSucceeded else null
    

    如果job完成,jobResult为job的执行结果。对于0个task的job,直接设置job执行结果为JobSucceeded。


      def cancel() {
        
        dagScheduler.cancelJob(jobId)
      }
    

    发送一个信号来取消job。该取消操作本身会被异步执行。在TaskScheduler取消所有属于该job的tasks后,该job会以一个Spark异常结束。


    override def taskSucceeded(index: Int, result: Any): Unit = synchronized { ... }
    
    • 讲该task结果,即参数result,填充到SparkContext.runJob中建立的val results = new Array[U](partitions.size)
    • finishedTasks += 1,判断finishedTasks是否与totalTasks相等,若相等,则_jobFinished = true jobResult = JobSucceeded

    问:什么情况下会 taskSucceeded 方法会被调用?
    答:DAGScheduler收到completion @ CompletionEvent事件后,会调用dagScheduler.handleTaskCompletion(completion),该函数会最终调用job.listener.taskSucceeded(rt.outputId, event.result),job.listener为trait JobListener对象,具体实现为JobWaiter


    def awaitResult(): JobResult = synchronized { ... }
    等待job结束,并返回jobResult


    欢迎关注我的微信公众号:FunnyBigData

    FunnyBigData

    相关文章

      网友评论

        本文标题:[Spark源码剖析] JobWaiter

        本文链接:https://www.haomeiwen.com/subject/vrgzcttx.html