美文网首页
Spark Core解析 2:Scheduler 调度体系

Spark Core解析 2:Scheduler 调度体系

作者: Liam666 | 来源:发表于2019-12-12 23:42 被阅读0次

    Spark Core解析 2:Scheduler 调度体系

    Overview

    调度系统,是贯穿整个Spark应用的主心骨,从调度系统开始入手了解Spark Core,比较容易理清头绪。

    Spark的资源调度采用的是常见的两层调度,底层资源的管理和分配是第一层调度,交给YARN、Mesos或者Spark的Standalone集群处理,Application从第一层调度拿到资源后,还要进行内部的任务和资源调度,将任务和资源进行匹配,这是第二层调度,本文讲的就是这第二层调度

    Spark的调度体系涉及的任务包括3个粒度,分别是Job、Stage、Task。
    Job代表用户提交的一系列操作的总体,一个具体的计算任务,有明确的输入输出,一个Job由多个Stage组成;
    一个Stage代表Job计算流程的一个组成部分,一个阶段,包含多个Task;
    一个Task代表对一个分区的数据进行计算的具体任务。

    层级关系:Job > Stage > Task

    Spark Core 解析:RDD 弹性分布式数据集中,已经解释了RDD之间的依赖,以及如何组成RDD血缘图。

    所以本文主要目的就是解释清楚:Scheduler将RDD血缘图转变成Stage DAG,然后生成Task,最后提交给Executor去执行的过程。

    20191212230626.png

    Stage

    Job的不同分区的计算通常可以并行,但是有些计算需要将数据进行重新分区,这个过程称作shuffle(混洗)。Shuffle的过程是没法完全并行的,这时候就会出现task之间的等待,task的数量也可能发生变化,所以Spark中以shuffle为边界,对task进行划分,划分出来的每段称为Stage。

    Stage代表一组可以并行的执行相同计算的task,每个任务必须有相同的分区规则,这样一个stage中是没有shuffle的。

    在一个Spark App中,stage有一个全局唯一ID,stage id是自增的。

    20191028171155.png

    Stage分为两种:

    • ResultStage:最后执行的stage,负责Job最终的结果输出,每个Job有且仅有一个ResultStage
    • ShuffleMapStage:该stage的输出不是最终结果,而是其他stage的输入数据,通常涉及一次shuffle计算。

    stage创建流程:

    • 从最终执行action的RDD开始,沿着RDD依赖关系遍历,
      一旦发现某个RDD的dependency是ShuffleDependency,就创建一个ShuffleMapStage。
    • 最后创建ResultStage。

    example 1

    val rg=sc.parallelize(List((1,10),(2,20)))
    rg.reduceByKey(_+_).collect
    
    stages-simple.png

    这里reduceByKey操作引起了一次shuffle,所以job被切分成了2个stage。

    example 2

    val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c")))
    val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C")))
    rddA.join(rddB).collect
    
    stages-join.png

    join操作导致rddA和rddB都进行了一次shuffle,所以有3个stage。

    example 3

    import org.apache.spark.HashPartitioner
    val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c"))).partitionBy(new HashPartitioner(3))
    val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C")))
    rddA.join(rddB).collect
    
    stages-co-join.png

    WHAT ?

    因为rddA已经定义了Partitioner,这里join操作会保留rddA的分区方式,所以对rddA的依赖是OneToOneDepenency,而对于rddB则是ShuffleDependency。

    stage-example-3-2.png

    探索:一个RDD被依赖多次,会如何

    val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c")))
    rddA join rddA collect
    
    rdd use twice.png rdd-used-twice.png

    一个RDD被两个stage使用了。

    小结

    综上,stage的划分一定是依据shuffle即ShuffleDependency,跟算子和RDD变量的定义没有很强的关系,example2和3中的join操作rddA.join(rddB).collect看起来一模一样,但实际产生的stage划分却差别很大。

    Task

    与stage对应,task也分为两种:

    • ShuffleMapTask:即ShuffleMapStage中的task,主要完成map、shuffle计算。
    • ResultTask:ResultStage中的task,主要完成最终结果输出或者返回结果给driver的任务。

    一个stage有多少个partition就会创建多少个task,比如一个ShuffleMapStage有10个partition,那么就会创建10个ShuffleMapTask。

    一个Stage中的所有task组成一个TaskSet。

    Job Submit

    graph TB
    R(RDD.action)-->S(SparkContext.runJob)-- RDD -->D(DAGScheduler.runJob)
    -- TaskSet -->T(TaskScheduler.submitTasks)-- TaskDescription -->E(Executor.launchTask)
    

    RDD在action操作中通过SparkContext.runJob方法触发Job执行流程,该方法将调用DagScheduler.runJob方法,将RDD传入DagScheduler。然后,DAGScheduler创建TaskSet提交给TaskScheduler,TaskScheduler再将TaskSet封装成TaskDescription发送给Executor,最后Executor会将TaskDescription提交给线程池来运行。

    Stage Scheduler(high-level)

    DagScheduler

    Stage级别的调度是DagScheduler负责的,也是Spark调度体系的核心。

    DagScheduler的工作模式

    sequenceDiagram
        participant M as main thread
        participant L as eventProcessLoop
        participant E as event thread
        M-->>L: post event
        E-->>L: handle event
    

    DagScheduler内部维护了一个事件消息总线eventProcessLoop(类型为DAGSchedulerEventProcessLoop),其实就是一个用来存储DAGSchedulerEvent类型数据的队列。

    当DagScheduler的一些方法被调用的时候(如submitJob方法),并不会在主线程中处理该任务,而是post一个event(如JobSubmitted)到eventProcessLoop。eventProcessLoop中有一个守护线程,会不断的依次从队列中取出event,然后调用对应的handle(如handleJobSubmitted)方法来执行具体的任务。

    Stage调度流程

    • 1.submit job

      DagScheduler.runJob方法会调用submitJob方法,向eventProcessLoop发送一个JobSubmitted类型的消息,其中包含了RDD等信息。当eventProcessLoop接收到JobSubmitted类型的消息,会调用DagScheduler.handleJobSubmitted方法来处理消息。

    sequenceDiagram
        participant M as main thread(runJob)
        participant L as eventProcessLoop
        participant E as event thread(handleJobSubmitted)
        M-->>L: post JobSubmitted event
        E-->>L: handle JobSubmitted event
    
    • 2.create stage

      • DagScheduler在它的handleJobSubmitted方法中开始创建ResultStage。ResultStage中包含了最终执行action的finalRDD,以及计算函数func。

      • ResultStage有个parents属性,这个属性是个列表,也就是说可以有多个parent stage。创建ResultStage时需要先创建它的parent stage来填充这个属性,也就是说要创建ResultStage直接依赖的所有ShuffleMapStage。

      • 通过stage.rdd.dependencies属性,采用宽度优先遍历,一旦发现某个RDD(假设叫rddA)的dependency是ShuffleDependency,就创建一个ShuffleMapStage,ShuffleMapStage中包含的关键信息与ResultStage不同,是rddA的ShuffleDependency和rddA的ShuffleDependency.rdd,也就是说新创建的ShuffleMapStage持有的信息是他自身的最后一个RDD和该RDD的子RDD的dependency。

      • 创建一个ShuffleMapStage的过程同理会需要创建它的parent stage,也是若干ShuffleMapStage。如此递归下去,直到创建完所有的ShuffleMapStage,最后才完成ResultStage的创建。最后创建出来的这些Stage(若干ShuffleMapStage加一个ResultStage),通过parent属性串起来,就像这样

        graph TD
        A[ResultStage]-- parent -->B[ShuffleMapStage 1]
        A-- parent -->C[ShuffleMapStage 2]
        B-- parent -->D[ShuffleMapStage 3]
        

        这就生成了所谓的DAG图,但是这个图的指向跟执行顺序是反过来的,如果按执行顺序来画DAG图,就是常见的形式了:

        graph TD
        D[ShuffleMapStage 3]-->C[ShuffleMapStage 2]
        C[ShuffleMapStage 2]-->A[ResultStage]
        B[ShuffleMapStage 1]-->A[ResultStage]
        
    • 3.submit stage

      DagScheduler.handleJobSubmitted方法创建好ResultStage后会提交这个stage(submitStage方法),在提交一个stage的时候,会要先提交它的parent stage,也是通过递归的形式,直到一个stage的所有parent stage都被提交了,它自己才能被提交,如果一个stage的parent还没有完成,则会把这个stage加入waitingStages。也就是说,DAG图中前面的stage会被先提交。当一个stage的parent都准备好了,也就是执行完了,它才会进入submitMissingTasks的环节。

    • 4.submit task

      Task是在DagScheduler(不是TaskScheduler)的submitMissingTasks方法中创建的,包括ShuffleMapTask和ResultTask,与Stage对应。归属于同一个stage的这批Task组成一个TaskSet集合,最后提交给TaskScheduler的就是这个TaskSet集合。

    20191029095005.png

    Task Scheduler(low-level)

    Task的调度工作是由TaskScheduler与SchedulerBackend紧密合作,共同完成的。

    TaskScheduler是task级别的调度器,主要作用是管理task的调度和提交,是Spark底层的调度器。

    SchedulerBackend是TaskScheduler的后端服务,有独立的线程,所有的Executor都会注册到SchedulerBackend,主要作用是进行资源分配、将task分配给executor等。

    Task调度流程

    spark task scheduler.png

    第一个线程是DAGScheduler的事件处理线程,在其中,Task先经过DAGScheduler(蓝色箭头表示)封装成TaskSet,再由TaskScheduler(绿色箭头)封装成TaskSetManager,并加入调度队列中。

    SchedulerBackend在收到ReviveOffers消息时,会从线程池取一个线程进行makeOffers操作,WorkerOffer创建后传递给TaskScheduler进行分配。

    图中第二个线程就是SchedulerBackend的一个事件分发线程,从Pool中取出最优先的TaskSetManager,然后将WorkerOffer与其中的Task进行配对,生成TaskDescription,发送给WorkerOffer指定的Executor去执行。

    工作流程

    TaskScheduler.png
    • 1 DAGScheduler(submitMissingTasks方法中)调用TaskScheduler.submitTasks()创建并提交TaskSet给TaskScheduler;
    • 2 TaskScheduler拿到TaskSet后会创建一个TaskSetManager来管理它,并且把TaskSetManager添加到rootPool调度池中;
    • 3 调用SchedulerBackend.reviveOffers()方法;
    • 4 SchedulerBackend发送ReviveOffers消息给DriverEndpoint;
    • 5 DriverEndpoint收到ReviveOffers消息后,会调用makeOffers()方法创建WorkerOffer,并通过TaskScheduler.resourceOffers()返回offer;
    • 6 TaskScheduler从rootPool获取按调度算法排序后的TaskSetManager列表,取第一个TaskSetManager,逐个给TaskSet的Task分配WorkerOffer,生成TaskDescription(包含offer信息);
    • 7 调用SchedulerBackend.DriverEndpoint的launchTasks方法,将TaskDescription序列化并封装在LaunchTask消息中,发送给offer指定的executor。LaunchTask消息被ExecutorBackend收到后,会将Task信息反序列化,传给Executor.launchTask(),最后使用Executor的线程池中的线程来执行这个Task。

    梳理

    Stage,TaskSet,TaskSetManager是一一对应的,数量相等,都是只存在driver上的。
    Parition,Task,TaskDescription是一一对应,数量相同,Task和TaskDescription是会被发到executor上的。

    TaskScheduler的调度池

    与DAGScheduler不同的是TaskScheduler有调度池,有两种调度实体,Pool和TaskSetManager。
    与YARN的调度队列类似,采用了层级队列的方式,Pool是TaskSetManager的容器,起到将TaskSetManager分组的作用。

    Schedulable

    Schedulable是调度实体的基类,有两个子类Pool和TaskSetManager。

    要理解调度规则,必须知道下面几个属性:

    • parent:所属调度池,顶层的调度池为root pool;
    • schedulableQueue:包含的调度对象组成的队列;
    • schedulingMode:调度模式,FIFO or FAIR;
    • weight:权重
    • minShare:最小分配额(CPU核数)
    • runningTasks:运行中task数
    • priority:优先级
    • stageId:就是stageId
    • name:名称

    Pool和TaskSetManager对于这些属性的取值有所不同,从而导致了他们的调度行为也不一样。

    properties Pool TaskSetManager
    weight config 1
    minShare config 0
    priority 0 jobId
    stageId -1 stageId
    name config TaskSet_{taskSet.id}
    runningTasks Pool所含TaskSetManager的runningTasks和 TaskSetManager运行中task数

    Pools创建流程

    TaskScheduler有个属性schedulingMode,值取决于配置项spark.scheduler.mode,默认为FIFO。这个属性会导致TaskScheduler使用不同的SchedulableBuilder,即FIFOSchedulableBuilder和FairSchedulableBuilder。

    TaskScheduler在初始化的时候,就会创建root pool,根调度池,是所有pool的祖先。
    它的属性取值为:

    name: "" (空字符串)
    schedulingMode: 同TaskScheduler的schedulingMode属性
    weight: 0
    minShare: 0
    

    注意root pool的调度模式确定了。

    接下来会执行schedulableBuilder.buildPools()方法,

    • 如果是FIFOSchedulableBuilder,则什么都不会发生。

    • 若是FairSchedulableBuilder

      • 1 依据scheduler配置文件(后面会说),开始创建pool(可以是多个pool,FIFO,FAIR都有可能,取决于配置文件),并都加入root pool中。
      • 2 如果现在root pool中没有名为"default"的pool(即配置文件中没有定义一个叫default的pool),创建default pool,并加入root pool中。
        这时default pool它的属性取值是固定的:
      name: "default"
      schedulingMode: FIFO
      weight: 1
      minShare: 0
      

    Task加入pool流程

    当TaskScheduler提交task的时候,会先创建TaskSetManager,然后通过schedulableBuilder添加到pool中。

    • 如果是FIFOSchedulableBuilder,则会直接把TaskSetManager加入root pool队列中。

    • 若是FairSchedulableBuilder

      • 1 从spark.scheduler.pool配置获取pool name,没有定义则用'default';
      • 2 从root pool遍历找到对应名称的pool,把TaskSetManager加入pool的队列。如果没有找到,则创建一个该名称的pool,采用与default pool相同的属性配置,并加入root pool。

    调度池结构

    经过上面两部分,最终得到的调度池结构如下:

    spark.scheduler.mode=FIFO

    20191128210416.png

    spark.scheduler.mode=FAIR

    20191128210432.png

    Fair Scheduler pools配置

    Fair Scheduler Pool的划分依赖于配置文件,默认的配置文件为'fairscheduler.xml',也可以通过配置项"spark.scheduler.allocation.file"指定配置文件。

    煮个栗子,文件内容如下:

    <?xml version="1.0"?>
    <allocations>
      <pool name="prod">
        <schedulingMode>FAIR</schedulingMode>
        <weight>1</weight>
        <minShare>2</minShare>
      </pool>
      <pool name="test">
        <schedulingMode>FIFO</schedulingMode>
        <weight>2</weight>
        <minShare>3</minShare>
      </pool>
    </allocations>
    

    这里配置了两个pool,prod和test,并且配置了相关属性,这两个pool都会添加到root pool中

    调度算法

    以SchedulingAlgorithm为基类,内置实现的调度算法有两种FIFOSchedulingAlgorithm和FairSchedulingAlgorithm,其逻辑如下:

    • FIFO: 先进先出,优先级比较算法如下,

      • 1.比较priority,小的优先;
      • 2.priority相同则比较StageId,小的优先。
    • FAIR:公平调度,优先级比较算法如下,

      • 1.runningTasks小于minShare的优先级比不小于的优先级要高。
      • 2.若两者运行的runningTasks都比minShare小,则比较minShare使用率(runningTasks/max(minShare,1)),使用率越低优先级越高。
      • 3.若两者的minShare使用率相同,则比较权重使用率(runningTasks/weight),使用率越低优先级越高。
      • 4.若权重也相同,则比较name,小的优先。
    Pool为FIFO模式下的几种情形

    TaskSetManager之间的比较,其实就是先比较jobId再比较stageId,谁小谁优先,意味着就是谁先提交谁优先。

    Pool之间的比较,不存在!FIFO的pool队列中是不会有pool的。

    Pool为FAIR模式下的几种情形

    TaskSetManager之间的比较,因为minShare=0,weight=1,FAIR算法变成了:

    • 1 runningTasks小的优先
    • 2 runningTasks相同则比较name

    Pool之间的比较,就是标准的FAIR算法。

    当root pool为FAIR模式,先取最优先的pool,再从pool中,按pool的调度模式取优先的TaskSetManager。

    开始使用FAIR mode

    启用FAIR模式:

    • 1 准备好fairscheduler.xml文件
    • 2 启动参数添加 --conf spark.scheduler.mode=FAIR
    • 3 运行启动命令,如spark-shell --master yarn --deploy-mode client --conf spark.scheode=FAIR
    ui-fair.png

    启动后如果直接运行Job会自动提交到default pool,那么如何提交Job到指定pool?
    SparkContext.setLocalProperty("spark.scheduler.pool","poolName")

    如果每次只运行一个Job,开启FAIR模式的意义不大,那么如何同时运行多个Job?
    要异步提交Job,需要用到RDD的async action,目前有如下几个:

    countAsync
    collectAsync
    takeAsync
    foreachAsync
    foreachPartitionAsync
    

    举个例子:

    sc.setLocalProperty("spark.scheduler.pool","test")
    b.foreachAsync(_=>Thread.sleep(100))
    sc.setLocalProperty("spark.scheduler.pool","production")
    b.foreachAsync(_=>Thread.sleep(100))
    

    这样就会有两个任务在不同的pool同时运行:

    pools.png

    FAIR mode应用场景

    场景1:Spark SQL thrift server
    作用:让离线任务和交互式查询任务分配到不同的pool,给交互式查询任务更高的优先级,这样长时间运行的离线任务就不会一直占用所有资源,阻塞交互式查询任务。

    场景2:Streaming job与Batch job同时运行
    作用:比如用Streaming接数据写入HDFS,可能产生很多小文件,可以在低优先级的pool定时运行batch job合并小文件。

    另外可以参考Spark Summit 2017的分享:Continuous Application with FAIR Scheduler

    参考

    Spark内核设计的艺术

    spark任务调度FIFO和FAIR的详解

    Job Scheduling

    转载请注明原文地址:
    https://liam-blog.ml/2019/11/07/spark-core-scheduler/

    查看更多博主文章

    相关文章

      网友评论

          本文标题:Spark Core解析 2:Scheduler 调度体系

          本文链接:https://www.haomeiwen.com/subject/ttuxnctx.html