美文网首页
4.5 DAGScheduler Stage的简要说明

4.5 DAGScheduler Stage的简要说明

作者: GongMeng | 来源:发表于2018-11-15 00:35 被阅读0次

    1. 概要

    从上文中我们可以看到JOB内部是分成多个Stage, 这些Stage之间以Action操作来划分.


    stage

    继而我们可以从WebUI上看到每个stage内部是划分成多个Task
    实际上每个Task就是对Parition的各种操作, 这些Task是可以并行运行的, 这也是分布式计算的最重要的特点, 让那些能并发计算的任务在大量的机器上一起运行, 从而加速计算.

    每个stage仅仅针对一个RDD进行操作, 但它可能依赖于多个上游的parent stage. 像rdd.union操作这种就是把两个RDD合并成一个RDD

    stage的依赖

    前文中也提到了, 有两种stage, 中间计算结果的ShuffleMapStage, 和没有任何后续依赖的ResultStage


    两种stage

    在JOB内部, 把RDD的shuffle操作当成篱笆, 篱笆与篱笆之间的transform操作合并到一个stage里面去. 最终实现了spark的高效计算的框架


    stage生成

    每个Stage对上游的stage的依赖一定是源于某个shuffle操作, 不得不从其它节点上拉取数据, Executor自身维护的parition不足以进行计算.如果发现它们依赖的那些数据没有计算完, 就会向上游发信息要求重新进行计算.

    INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 36 (ShuffledRDD[86] at reduceByKey at <console>:24)

    这些缺失的信息需要计算的Task会发送给TaskScheduler, 由它进行具体的安排.


    缺失信息计算

    2. ShuffleMapStage

    前文提到, 本质上这里就是经过一系列的Map 运算, 生成中间结果的shuffle文件, 等待后续的Reduce 过程拉取这些文件, 进行下一轮计算.
    一个ShuffleMapStage内部可能包含多个本地 pipeline 计算, 比如像map, filter这些narrowdependency过程是尽可能的在本地完成的, 并且是以Task的形式在所有的parition上并发跑完的.

    如果做了缓存, 或者正好这些shuffle文件没有过期, 一个中间结果, 可以被多个JOB的stage使用. 如果恰好中间有一些文件过期被删除了, 就要从新计算.

    /**
     * ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
     * They occur right before each shuffle operation, and might contain multiple pipelined operations
     * before that (e.g. map and filter). When executed, they save map output files that can later be
     * fetched by reduce tasks. The `shuffleDep` field describes the shuffle each stage is part of,
     * and variables like `outputLocs` and `numAvailableOutputs` track how many map outputs are ready.
     *
     * ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
     * For such stages, the ActiveJobs that submitted them are tracked in `mapStageJobs`. Note that
     * there can be multiple ActiveJobs trying to compute the same shuffle map stage.
     */
    

    2.1 初始化需要

    1. id - - identifier
    2. rdd — 涉及到的RDD
    3. numTasks — 并发数, 也就是task数, 和parition应该是1:1的
    4. parents — 上游依赖的多个stage
    5. firstJobId — 归属到哪个JOB里, 第一个触发这个stage创建的job的id. 需要注意由于这个RDD在多个JOB里被用到, 这里仅仅用第一个触发的来记录它.
    6. shuffleDep — 依赖的上游的shuffle的一些信息的句柄. 沿着句柄找到它依赖的上游的那些shuffle过程的数据

    2.2 Registering MapStatus For Partition

    addOutputLoc(partition: Int, status: MapStatus): Unit

    对Parition执行的MAP过程, 也就是task过程进行状态跟踪
    这个时候将numAvailableOutputs喜加一

    2.3 Removing MapStatus For Partition And BlockManager

    removeOutputLoc(partition: Int, bmAddress: BlockManagerId): Unit

    把MapStatus从outPutLoc列表里删除, 同时也意味着这个存储块试下了, 所以也要通知管理存储的BlockManager.
    这个时候将numAvailableOutputs减一

    2.4 ShuffleMapStage的共享

    上文中我们也到一个中间状态的RDD可能出现在多个JOB里, 类似我们生成了一个PairRDD<Person, Salary>. 我们可能需要知道每个人的工资是多少reduceByKey, 也可能想知道一共需要发多少工资reduce, 还可能想知道有多少个人count

    这些Action会触发各自的Job, 但他们都依赖同一个中间结果

    2.5 Registering Job -- Deregistering Job

    addActiveJob(job: ActiveJob): Unit
    removeActiveJob(job: ActiveJob): Unit
    像上文说的, 我们需要维护这个stage和那些Activejob有关系

    2.6 Preparing Shuffle Map Outputs in MapOutputTracker Format

    Preparing Shuffle Map Outputs in MapOutputTracker Format

    TODO 内容优势疏漏, 待完善

    当这个stage内部计算完毕的时候, 所有的MapStatus标记为Finished. 这个时候可以通知DAGScheduler进行下一步依赖这个中间Stage的操作了.

    对应的Shuffle结果会发送给MapOutputTracker来管理, shuffle结果以每个parition的第一个元素的地址来标记. 理解为这里分布式的产生一大堆的结果在每个服务器上, 现在我告诉你每个结果的入口(链表头), 你就可以来找这些数据了.

      /**
       * Returns an array of [[MapStatus]] (index by partition id). For each partition, the returned
       * value contains only one (i.e. the first) [[MapStatus]]. If there is no entry for the partition,
       * that position is filled with null.
       */
      def outputLocInMapOutputTrackerFormat(): Array[MapStatus] = {
        outputLocs.map(_.headOption.orNull)
      }
    

    3. ResultStage

    最终大结局,DAG的末尾.
    由于DAG图是从后向前推算的, 所以ResultStage事实上是这个JOB内部创建出来的第一个Stage

    创建过程
    计算过程和ShuffleMapStage一样
    计算过程

    `

    相关文章

      网友评论

          本文标题:4.5 DAGScheduler Stage的简要说明

          本文链接:https://www.haomeiwen.com/subject/imfefqtx.html