美文网首页
5.1 TaskScheduler 概述

5.1 TaskScheduler 概述

作者: GongMeng | 来源:发表于2018-11-15 14:50 被阅读0次

    1. 概述

    从上文中可以看到, Driver通过DAGScheduler和RDD依赖图谱关系, 一阵推算, 把JOB变成了多个Stage的执行蓝图后. 这些Stage要通过TaskScheduler来实际执行. 把执行过程的task分给维护着Parition信息的Executor分布式干活, 并记录和追踪执行结果

    这里的TaskScheduler按照我们一直贯彻的外包公司模型, 可以理解成产品经理终于搞清楚设计不再改了,于是他开始在JIRA里建立Sprint, 在Sprint里建立task给大家分活.


    TaskScheduler

    TaskScheduler利用心跳接收器那边的信息executorHeartBeatReceived来知道现在有多少搬砖工在工位上, 它会注册executorLost executorAdd等事件到liveListener上监听哪些哥们新来了, 哪些哥们旷工了.

    和现实中一样, 产品经理也分好多种实现

    • TaskSchedulerImpl 默认的TaskScheduler
    • YarnScheduler 适用于yarn-client deploy model. 这种模式下Driver跑在提交任务的client上
    • YarnClusterScheduler 适用于 yarn-cluster deploy model. 这种模式下Driver跑在Yarn集群上

    2. 接口代码

    
    /**
     * Low-level task scheduler interface, currently implemented exclusively by
     * [[org.apache.spark.scheduler.TaskSchedulerImpl]].
     * This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks
     * for a single SparkContext. These schedulers get sets of tasks submitted to them from the
     * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
     * them, retrying if there are failures, and mitigating stragglers. They return events to the
     * DAGScheduler.
     */
    private[spark] trait TaskScheduler {
    
      private val appId = "spark-application-" + System.currentTimeMillis
    
      def rootPool: Pool
    
      def schedulingMode: SchedulingMode
    
      def start(): Unit
    
      // Invoked after system has successfully initialized (typically in spark context).
      // Yarn uses this to bootstrap allocation of resources based on preferred locations,
      // wait for slave registrations, etc.
      def postStartHook() { }
    
      // Disconnect from the cluster.
      def stop(): Unit
    
      // Submit a sequence of tasks to run.
      def submitTasks(taskSet: TaskSet): Unit
    
      // Cancel a stage.
      def cancelTasks(stageId: Int, interruptThread: Boolean)
    
      // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
      def setDAGScheduler(dagScheduler: DAGScheduler): Unit
    
      // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
      def defaultParallelism(): Int
    
      /**
       * Update metrics for in-progress tasks and let the master know that the BlockManager is still
       * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
       * indicating that the block manager should re-register.
       */
      def executorHeartbeatReceived(execId: String, taskMetrics: Array[(Long, TaskMetrics)],
        blockManagerId: BlockManagerId): Boolean
    
      /**
       * Get an application ID associated with the job.
       *
       * @return An application ID
       */
      def applicationId(): String = appId
    
      /**
       * Process a lost executor
       */
      def executorLost(executorId: String, reason: ExecutorLossReason): Unit
    
      /**
       * Get an application's attempt ID associated with the job.
       *
       * @return An application's Attempt ID
       */
      def applicationAttemptId(): Option[String]
    
    }
    

    3. 生命周期

    TaskScheduler生命周期

    4. 维护的结构

    TaskScheduler中主要为了了一种叫org.apache.spark.scheduler.Schedulable的结构, 这是一个interface.

    可以看到它有两种实现 一个叫Pool, 一个叫TaskSetManager.

    /**
     * An interface for schedulable entities.
     * there are two type of Schedulable entities(Pools and TaskSetManagers)
     */
    private[spark] trait Schedulable {
      var parent: Pool
      // child queues
      def schedulableQueue: ConcurrentLinkedQueue[Schedulable]
      def schedulingMode: SchedulingMode
      def weight: Int
      def minShare: Int
      def runningTasks: Int
      def priority: Int
      def stageId: Int
      def name: String
    
      def addSchedulable(schedulable: Schedulable): Unit
      def removeSchedulable(schedulable: Schedulable): Unit
      def getSchedulableByName(name: String): Schedulable
      def executorLost(executorId: String, host: String, reason: ExecutorLossReason): Unit
      def checkSpeculatableTasks(): Boolean
      def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager]
    }
    

    在使用上, 实际上Pool相当于一个树形结构的非叶子节点, 而TaskSetManager则是这个树上的叶子节点.
    在TaskScheduler初始化的时候, 需要制定使用哪种策略.

    后面我们会大概讲解TaskSetManager和Pool的一些结构

    def initialize(backend: SchedulerBackend) {
        this.backend = backend
        // temporarily set rootPool name to empty
        rootPool = new Pool("", schedulingMode, 0, 0)
        schedulableBuilder = {
          schedulingMode match {
            case SchedulingMode.FIFO =>
              new FIFOSchedulableBuilder(rootPool)
            case SchedulingMode.FAIR =>
              new FairSchedulableBuilder(rootPool, conf)
          }
        }
        schedulableBuilder.buildPools()
      }
    

    相关文章

      网友评论

          本文标题:5.1 TaskScheduler 概述

          本文链接:https://www.haomeiwen.com/subject/iwjyfqtx.html