美文网首页
Spark DAG之划分Stage

Spark DAG之划分Stage

作者: 博弈史密斯 | 来源:发表于2018-05-15 08:44 被阅读0次

    概要

    介绍Stage的定义,DAGScheduler划分Stage流程。

    Stage

    查看Stage定义


    Stage中有两个重要属性,rddparents,分别记录的是切分处的RDD和父Stage信息,这一点结合我后面的例子更好理解。Stage有两个子类,ShuffleMapStage、ResultStage,两者分别增加了一个重要属性信息,如下

    stage 差异属性 作用
    ShuffleMapStage shuffleDep: ShuffleDependency 保存Dependency信息
    ResultStage func: (TaskContext, Iterator[_]) => _ 保存action对应的处理函数

    处理JobSubmitted事件

    上一篇最后讲到调用DAGScheduler的handleJobSubmitted方法处理JobSubmitted事件,查看该方法

      private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          callSite: CallSite,
          listener: JobListener,
          properties: Properties) {
        var finalStage: ResultStage = null
        //划分Stage,返回ResultStage,Stage使用 parents 属性保存父 Stage
        finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    
        //创建ActiveJob,并添加到对应集合管理
        val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
        jobIdToActiveJob(jobId) = job
        activeJobs += job
        finalStage.setActiveJob(job)
        
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        //提交 Stage
        submitStage(finalStage)
      }
    

    如上处,handleJobSubmitted方法主要职责如下:

    1. 调用 createResultStage 方法,划分DAG生成Stage。
    2. 创建ActiveJob,并添加到对应集合管理。
    3. 调用submitStage 提交Stage。

    划分Stage

    DAGScheduler的 createResultStage方法负责划分DAG生成Stage,createResultStage方法:1 调用 getOrCreateParentStages方法创建父Stage,2 创建 ResultStage。
    getOrCreateParentStages :

      private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        getShuffleDependencies(rdd).map { shuffleDep =>
          getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
      }
    

    首先 getShuffleDependencies 获取 所有的 ShuffleDependency

      private[scheduler] def getShuffleDependencies(
          rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
          
        //记录 所有的 ShuffleDependency
        val parents = new HashSet[ShuffleDependency[_, _, _]]
        
        //记录所有已经处理的 RDD
        val visited = new HashSet[RDD[_]]
        
        //记录所有待处理的 RDD
        val waitingForVisit = new Stack[RDD[_]]
        
        //把当前的 ResultRdd,也就是最后一个RDD,放到 waitingForVisit
        waitingForVisit.push(rdd)
        
        while (waitingForVisit.nonEmpty) {
          //从 waitingForVisit 取出一个 RDD 去处理
          val toVisit = waitingForVisit.pop()
          
          //已经处理的RDD列表中 不包含 要处理的这个RDD
          //保证下面的流程都是针对要处理的RDD
          if (!visited(toVisit)) {
            visited += toVisit
            
            //RDD的 dependencies 方法,保存了所有RDD的 dependency
            toVisit.dependencies.foreach {
              //如果是宽依赖,则添加到 parents
              case shuffleDep: ShuffleDependency[_, _, _] =>
                parents += shuffleDep
                
              //如果是窄依赖,则把这个依赖的 RDD,添加到 waitingForVisit
              //一直往上找,直到找到下一个宽依赖
              case dependency =>
                waitingForVisit.push(dependency.rdd)
            }
          }
        }
        parents
      }
    

    如上面代码注释,getShuffleDependencies里主要逻辑为:通过action操作后的RDD,往上遍历所有RDD,寻找所有的 ShuffleDependency 列表,并返回

    然后根据 每个 Shuffle 划分 Stage,看下 getOrCreateShuffleMapStage 代码:

      private def getOrCreateShuffleMapStage(
          shuffleDep: ShuffleDependency[_, _, _],
          firstJobId: Int): ShuffleMapStage = {
        createShuffleMapStage(shuffleDep, firstJobId)
      }
      
      def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
        val rdd = shuffleDep.rdd
        val numTasks = rdd.partitions.length
        
        //再次调用 getOrCreateParentStages 创建 parents
        val parents = getOrCreateParentStages(rdd, jobId)
        val id = nextStageId.getAndIncrement()
        //根据 parents 创建 ShuffleMapStage
        val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
    
        //添加到 Map
        stageIdToStage(id) = stage
        shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    
        //返回创建的 stage
        stage
      }
    

    例子

    val sc = new SparkContext("local","wordcount")
    val data = sc.parallelize(List("a c", "a b", "b c", "b d", "c d"), 2)
    val wordcount = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).reduceByKey(_ + _)
    
    val data2 = sc.parallelize(List("a c", "a b", "b c", "b d", "c d"), 2)
    val wordcount2 = data2.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).map(x => (x._2, x._1)).reduceByKey(_ + _)
    
    wordcount.join(wordcount2).collect()
    

    RDD的依赖关系:


    1. 最左一列的parallelize、map等表示实例代码中的transformation。
    2. 圆角矩形表示transformation操作生成的RDD和该RDD的Dependency,其中ShuffleDependency使用蓝色标注。

    在上图ShuffleDependency处切分DAG生成Stage,结果如下 :


    1. 圆角矩形代表Stage,结果为四个ShuffleMapStage ,一个ResultStage。
    2. 圆角矩形内为Stage的两个属性。ShuffleMapStage和ResultStage有差别。

    到这里,Stage就划分完成了,最后贴张spark webUI的图片


    总结

    会在 Shuffle 处划分 Stage。

    相关文章

      网友评论

          本文标题:Spark DAG之划分Stage

          本文链接:https://www.haomeiwen.com/subject/bqejdftx.html