1.概要
大家都有用过Spark RDD, 读过官方文档可以知道RDD相当于客户交付的任务说明. 它通过执行一些的方法比如map, reduce, count, combine, union... 最终生成一个或者多个用户期望的输出结果.
不了解RDD建议先看官方文档, 然后写几个实例程序
RDD中记录了一系列的执行过程, 这些执行过程会被产品经理绘制成计划图, 这个图就是一个DAG GRAPH.
![](https://img.haomeiwen.com/i13676247/69bae01a6e2d5cf6.png)
通过官方文档我们得知RDD的操作分为两类, 一类是transform, 生成结果也是RDD. 另外一类是Action, 可以生成一些非RDD的结构. 当一个Action被触发时, SparkContext就会把RDD中记录的逻辑依赖顺序交付给DAGScheduler, 由它生成一系列需要执行的任务, 这就是一个Stage内要执行的任务. 这些任务打包成一个TaskSet交付执行
![](https://img.haomeiwen.com/i13676247/6309db520c0e9c54.png)
DAGScheduler
中最基本的执行单位是jobs and stages. 在DAG内部维护了一系列关注它们的方法
Name | Description |
---|---|
activeJobs |
ActiveJob instances |
cacheLocs |
每个RDD中的每个Block放在哪个Executor上, 可以用Executor的host port+ blockid去定位. |
failedEpoch |
失败的Executor和它们对应的任务 |
failedStages |
顾名思义 |
jobIdToActiveJob |
jobid到ActiveJob映射表 |
jobIdToStageIds |
jobid到stage映射表 |
nextJobId |
启动job的时候需要给job加id, 这里用了一个全局递增int |
nextStageId |
同上 |
runningStages |
当前正在执行的一系列stage, 经过优化后的新版本和Flink类似, 多个stage可以同时执行 |
shuffleIdToMapStage |
映射表 |
stageIdToStage |
当 DAGScheduler 启动一个shufflestage的时候用到 |
waitingStages |
上游需要计算, 等待中的stage |
2. 任务
DAGScheduler在SparkContext启动了TaskScheduler和SchedulerBackend后启动, 可以理解为产品经理和分配给他的认识的所有员工相互介绍了一轮后, 开始真正的项目安排.
![](https://img.haomeiwen.com/i13676247/05d368d94695d310.png)
DAGScheduler 重点做三个事情:
-
计算 execution DAG, 做规划
-
计算每个job执行的 [preferred locations], 这里和hadoop一样, 尽可能locality. 可以理解成一个哥们写完了用户注册模块, 他最好接着写用户登录模块, 这样相关知识不用在他和后继者之间转移, 实现了最佳速度.
-
当Shuffle数据丢失时重算, 理解成上文中那个写登录模块的哥们删库跑路了, 只好再安排一个哥们从头写. 这就是shuffle过程中, 工作交接丢失了. 没办法, 只能再安排一个哥们写
依赖 SparkContext, TaskScheduler, LiveListenerBus, MapOutputTracker, BlockManagerMaster
3. init部分源码
在注释里描述了DAG作为高层人物抽象, 如何负责把用户需求RDD的各种变化转化为执行蓝图, 然后交付到真正执行的service和scheduler手里去执行.
**
* The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of
* stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a
* minimal schedule to run the job. It then submits stages as TaskSets to an underlying
* TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent
* tasks that can run right away based on the data that's already on the cluster (e.g. map output
* files from previous stages), though it may fail if this data becomes unavailable.
*
* Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with
* "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks
* in each stage, but operations with shuffle dependencies require multiple stages (one to write a
* set of map output files, and another to read those files after a barrier). In the end, every
* stage will have only shuffle dependencies on other stages, and may compute multiple operations
* inside it. The actual pipelining of these operations happens in the RDD.compute() functions of
* various RDDs (MappedRDD, FilteredRDD, etc).
*
* In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred
* locations to run each task on, based on the current cache status, and passes these to the
* low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being
* lost, in which case old stages may need to be resubmitted. Failures *within* a stage that are
* not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task
* a small number of times before cancelling the whole stage.
*
* When looking through this code, there are several key concepts:
*
* - Jobs (represented by [[ActiveJob]]) are the top-level work items submitted to the scheduler.
* For example, when the user calls an action, like count(), a job will be submitted through
* submitJob. Each Job may require the execution of multiple stages to build intermediate data.
*
* - Stages ([[Stage]]) are sets of tasks that compute intermediate results in jobs, where each
* task computes the same function on partitions of the same RDD. Stages are separated at shuffle
* boundaries, which introduce a barrier (where we must wait for the previous stage to finish to
* fetch outputs). There are two types of stages: [[ResultStage]], for the final stage that
* executes an action, and [[ShuffleMapStage]], which writes map output files for a shuffle.
* Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
*
* - Tasks are individual units of work, each sent to one machine.
*
* - Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them
* and likewise remembers which shuffle map stages have already produced output files to avoid
* redoing the map side of a shuffle.
*
* - Preferred locations: the DAGScheduler also computes where to run each task in a stage based
* on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
*
* - Cleanup: all data structures are cleared when the running jobs that depend on them finish,
* to prevent memory leaks in a long-running application.
*
* To recover from failures, the same stage might need to run multiple times, which are called
* "attempts". If the TaskScheduler reports that a task failed because a map output file from a
* previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a
* CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small
* amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost
* stage(s) that compute the missing tasks. As part of this process, we might also have to create
* Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since
* tasks from the old attempt of a stage could still be running, care must be taken to map any
* events received in the correct Stage object.
*
* Here's a checklist to use when making or reviewing changes to this class:
*
* - All data structures should be cleared when the jobs involving them end to avoid indefinite
* accumulation of state in long-running programs.
*
* - When adding a new data structure, update `DAGSchedulerSuite.assertDataStructuresEmpty` to
* include the new structure. This will help to catch memory leaks.
*/
private[spark]
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging
网友评论