美文网首页Spark深入学习
【Spark Core】任务执行机制和Task源码浅析1

【Spark Core】任务执行机制和Task源码浅析1

作者: JasonDing | 来源:发表于2015-07-20 21:17 被阅读821次

    引言

    上一小节《TaskScheduler源码与任务提交原理浅析2》介绍了Driver侧将Stage进行划分,根据Executor闲置情况分发任务,最终通过DriverActor向executorActor发送任务消息。
    我们要了解Executor的执行机制首先要了解Executor在Driver侧的注册过程,这篇文章先了解一下Application和Executor的注册过程。

    1. Task类及其相关

    1.1 Task类

    Spark将由Executor执行的Task分为ShuffleMapTask和ResultTask两种,其源码存在scheduler package中。
    Task是介于DAGScheduler和TaskScheduler中间的接口,在DAGScheduler,需要把DAG中的每个stage的每个partitions封装成task,最终把taskset提交给TaskScheduler。

    /**
     * A unit of execution. We have two kinds of Task's in Spark:
     * - [[org.apache.spark.scheduler.ShuffleMapTask]]
     * - [[org.apache.spark.scheduler.ResultTask]]
     *
     * A Spark job consists of one or more stages. The very last stage in a job consists of multiple
     * ResultTasks, while earlier stages consist of ShuffleMapTasks. A ResultTask executes the task
     * and sends the task output back to the driver application. A ShuffleMapTask executes the task
     * and divides the task output to multiple buckets (based on the task's partitioner).
     *
     * @param stageId id of the stage this task belongs to
     * @param partitionId index of the number in the RDD
     */
    private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable 
    

    Task对应一个stageId和partitionId。
    提供runTask()接口、kill()接口等。
    提供killed变量、TaskMetrics变量、TaskContext变量等。

    除了上述基本接口和变量,Task的伴生对象提供了序列化和反序列化应用依赖的jar包的方法。原因是Task需要保证工作节点具备本次Task需要的其他依赖,注册到SparkContext下,所以提供了把依赖转成流写入写出的方法。

    1.2 ShuffleMapTask

    对应于ShuffleMap Stage, 产生的结果作为其他stage的输入。
    ShuffleMapTask复写了MapStatus向外读写的方法,因为向外读写的内容包括:stageId,rdd,dep,partitionId,epoch和split(某个partition)。对于其中的stageId,rdd,dep有统一的序列化和反序列化操作并会cache在内存里,再放到ObjectOutput里写出去。序列化操作使用的是Gzip,序列化信息会维护在serializedInfoCache = newHashMap[Int, Array[Byte]]。这部分需要序列化并保存的原因是:stageId,rdd,dep真正代表了本次Shuffle Task的信息,为了减轻master节点负担,把这部分序列化结果cache了起来。

    /**
    * A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitioner
    * specified in the ShuffleDependency).
    *
    * See [[org.apache.spark.scheduler.Task]] for more information.
    *
     * @param stageId id of the stage this task belongs to
     * @param taskBinary broadcast version of of the RDD and the ShuffleDependency. Once deserialized,
     *                   the type should be (RDD[_], ShuffleDependency[_, _, _]).
     * @param partition partition of the RDD this task is associated with
     * @param locs preferred task execution locations for locality scheduling
     */
    private[spark] class ShuffleMapTask(
        stageId: Int,
        taskBinary: Broadcast[Array[Byte]],
        partition: Partition,
        @transient private var locs: Seq[TaskLocation])
      extends Task[MapStatus](stageId, partition.index) with Logging {
    

    1.3 ResultTask

    对应于Result Stage直接产生结果。

    /**
     * A task that sends back the output to the driver application.
     *
     * See [[Task]] for more information.
     *
     * @param stageId id of the stage this task belongs to
     * @param taskBinary broadcasted version of the serialized RDD and the function to apply on each
     *                   partition of the given RDD. Once deserialized, the type should be
     *                   (RDD[T], (TaskContext, Iterator[T]) => U).
     * @param partition partition of the RDD this task is associated with
     * @param locs preferred task execution locations for locality scheduling
     * @param outputId index of the task in this job (a job can launch tasks on only a subset of the
     *                 input RDD's partitions).
     */
    private[spark] class ResultTask[T, U](
        stageId: Int,
        taskBinary: Broadcast[Array[Byte]],
        partition: Partition,
        @transient locs: Seq[TaskLocation],
        val outputId: Int)
      extends Task[U](stageId, partition.index) with Serializable {
    

    1.4 TaskSet

    TaskSet是一个数据结构,用于封装一个stage的所有的tasks, 以提交给TaskScheduler。
    TaskSet就是可以做pipeline的一组完全相同的task,每个task的处理逻辑完全相同,不同的是处理数据,每个task负责处理一个partition。pipeline,可以称为大数据处理的基石,只有数据进行pipeline处理,才能将其放到集群中去运行。对于一个task来说,它从数据源获得逻辑,然后按照拓扑顺序,顺序执行(实际上是调用rdd的compute)。

    /**
     * A set of tasks submitted together to the low-level TaskScheduler, usually representing
     * missing partitions of a particular stage.
     */
    private[spark] class TaskSet(
        val tasks: Array[Task[_]],
        val stageId: Int,
        val attempt: Int,
        val priority: Int,
        val properties: Properties) {
        val id: String = stageId + "." + attempt
    
      override def toString: String = "TaskSet " + id
    }
    

    2. Executor注册到Driver

    Driver发送LaunchTask消息被Executor接收,Executor会使用launchTask对消息进行处理。
    不过在这个过程之前,我们要知道,如果Executor没有注册到Driver,即便接收到LaunchTask指令,也不会做任务处理。所以我们要先搞清楚,Executor是如何在Driver侧进行注册的。

    2.1 Application注册

    Executor的注册是发生在Application的注册过程中的,我们以Standalone模式为例:

    SparkContext创建schedulerBackend和taskScheduler,schedulerBackend作为TaskScheduler对象的一个成员存在 --> 在TaskScheduler对象调用start函数时,其实调用了backend.start()函数 --> backend.start()函数中启动了AppClient,AppClient的其中一个参数ApplicationDescription就是封装的运行CoarseGrainedExecutorBackend的命令 --> AppClient内部启动了一个ClientActor,这个ClientActor启动之后,会尝试向Master发送一个指令actor ! RegisterApplication(appDescription) 注册一个Application

    下面是SparkDeploySchedulerBackend的start函数中的部分注册Application的代码:

        // Start executors with a few necessary configs for registering with the scheduler
        val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
        val javaOpts = sparkJavaOpts ++ extraJavaOpts
        val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
          args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
        val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
        val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
          appUIAddress, sc.eventLogDir, sc.eventLogCodec)
    
        client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
        client.start()
    

    AppClient向Master提交Application
    AppClient是Application和Master交互的接口。它的包含一个类型为org.apache.spark.deploy.client.AppClient.ClientActor的成员变量actor。它负责了所有的与Master的交互。其中提交Application过程涉及的函数调用为:
    ClientActor的preStart() --> 调用registerWithMaster() --> 调用tryRegisterAllMasters --> actor ! RegisterApplication(appDescription) --> Master的receiveWithLogging函数处理RegisterApplication消息。

    下面是RegisterApplication(appDescription)消息的相关处理代码(在Master.scala中的receiveWithLogging部分代码):

        case RegisterApplication(description) => {
          if (state == RecoveryState.STANDBY) {
            // ignore, don't send response
          } else {
            logInfo("Registering app " + description.name)
            val app = createApplication(description, sender)
            registerApplication(app)
            logInfo("Registered app " + description.name + " with ID " + app.id)
            persistenceEngine.addApplication(app)
            sender ! RegisteredApplication(app.id, masterUrl)
            schedule()//为处于待分配资源的Application分配资源。在每次有新的Application加入或者新的资源加入时都会调用schedule进行调度  
          }
        }
    

    这段代码做了以下几件事:

    1. createApplication为这个app构建一个描述App数据结构的ApplicationInfo
    1. 注册该Application,更新相应的映射关系,添加到等待队列里面
    2. 用persistenceEngine持久化Application信息,默认是不保存的,另外还有两种方式,保存在文件或者Zookeeper当中
    3. 通过发送方注册成功
    4. 开始作业调度(为处于待分配资源的Application分配资源。在每次有新的Application加入或者新的资源加入时都会调用schedule进行调度)

    2.2 Master中的schedule函数

    schedule()为处于待分配资源的Application分配资源。在每次有新的Application加入或者新的资源加入时都会调用schedule进行调度。为Application分配资源选择worker(executor),现在有两种策略:

    1. 尽量的打散,即一个Application尽可能多的分配到不同的节点。这个可以通过设置spark.deploy.spreadOut来实现。默认值为true,即尽量的打散。
    1. 尽量的集中,即一个Application尽量分配到尽可能少的节点。

    对于同一个Application,它在一个worker上只能拥有一个executor;当然了,这个executor可能拥有多于1个core。对于策略1,任务的部署会慢于策略2,但是GC的时间会更快。

    schedule函数的源码,解释在中文注释中:

      /*
       * Schedule the currently available resources among waiting apps. This method will be called
       * every time a new app joins or resource availability changes.
       */
      private def schedule() {
        if (state != RecoveryState.ALIVE) { return }
    
        // First schedule drivers, they take strict precedence over applications
        // Randomization helps balance drivers
        val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
        val numWorkersAlive = shuffledAliveWorkers.size
        var curPos = 0
    
        for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
          // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
          // start from the last worker that was assigned a driver, and continue onwards until we have
          // explored all alive workers.
          var launched = false
          var numWorkersVisited = 0
          while (numWorkersVisited < numWorkersAlive && !launched) {
            val worker = shuffledAliveWorkers(curPos)
            numWorkersVisited += 1
            if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
              launchDriver(worker, driver)
              waitingDrivers -= driver
              launched = true
            }
            curPos = (curPos + 1) % numWorkersAlive
          }
        }
    
        // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
        // in the queue, then the second app, etc.
        if (spreadOutApps) {//尽量的打散负载,如有可能,每个executor分配一个core  
          // Try to spread out each app among all the nodes, until it has all its cores
          for (app <- waitingApps if app.coresLeft > 0) {
            // 可用的worker的标准:State是Alive,其上并没有该Application的executor,可用内存满足要求。  
            // 在可用的worker中,优先选择可用core数多的。  
            val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
              .filter(canUse(app, _)).sortBy(_.coresFree).reverse
            val numUsable = usableWorkers.length
            val assigned = new Array[Int](numUsable) // Number of cores to give on each node
            var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
            var pos = 0
            while (toAssign > 0) {
              if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
                toAssign -= 1
                assigned(pos) += 1
              }
              pos = (pos + 1) % numUsable
            }
            // Now that we've decided how many cores to give on each node, let's actually give them
            for (pos <- 0 until numUsable) {
              if (assigned(pos) > 0) {
                val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
                launchExecutor(usableWorkers(pos), exec)
                app.state = ApplicationState.RUNNING
              }
            }
          }
        } else { //尽可能多的利用worker的core
          // Pack each app into as few nodes as possible until we've assigned all its cores
          for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
            for (app <- waitingApps if app.coresLeft > 0) {
              if (canUse(app, worker)) {
                val coresToUse = math.min(worker.coresFree, app.coresLeft)
                if (coresToUse > 0) {
                  val exec = app.addExecutor(worker, coresToUse)
                  launchExecutor(worker, exec)
                  app.state = ApplicationState.RUNNING
                }
              }
            }
          }
        }
      }
    

    2.3 launchExecutor函数

    在选择了worker和确定了worker上得executor需要的CPU core数后,Master会调用 launchExecutor(worker: WorkerInfo, exec: ExecutorInfo)向Worker发送请求,向AppClient发送executor已经添加的消息。同时会更新master保存的worker的信息,包括增加executor,减少可用的CPU core数和memory数。Master不会等到真正在worker上成功启动executor后再更新worker的信息。如果worker启动executor失败,那么它会发送FAILED的消息给Master,Master收到该消息时再次更新worker的信息即可。

      def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
        logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
        worker.addExecutor(exec)
        worker.actor ! LaunchExecutor(masterUrl,
          exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
        exec.application.driver ! ExecutorAdded(
          exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
      }
    

    2.4 Executor的创建

    下面的调用关系链是Worker接收到来自Master的LaunchExecutor消息后的调用过程:
    LaunchExecutor的消息处理中创建ExecutorRunner --> ExecutorRunner会将在SparkDeploySchedulerBackend中准备好的ApplicationDescription以进程的形式启动起来 --> 启动ApplicationDescription中携带的CoarseGrainedExecutorBackend --> CoarseGrainedExecutorBackend启动后,会首先通过传入的driverUrl这个参数向在CoarseGrainedSchedulerBackend::DriverActor发送RegisterExecutor消息 --> DriverActor会回复RegisteredExecutor --> CoarseGrainedExecutorBackend会创建一个Executor --> Executor创建完毕。

    CoarseGrainedExecutorBackend启动后,preStart函数执行的相关操作:

      override def preStart() {
        logInfo("Connecting to driver: " + driverUrl)
        driver = context.actorSelection(driverUrl)
        driver ! RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
        context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
      }
    

    CoarseGrainedExecutorBackend接收RegisteredExecutor消息后,创建Executor的操作:

      override def receiveWithLogging = {
        case RegisteredExecutor =>
          logInfo("Successfully registered with driver")
          val (hostname, _) = Utils.parseHostPort(hostPort)
          executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
    
        ......
    

    参考资料

    Spark Core源码分析: Spark任务模型
    Spark技术内幕:Executor分配详解 —— 强烈推荐该博文,其中博主结合Spark源码对Executor的分配讲解的非常详细

    转载请注明作者Jason Ding及其出处
    GitCafe博客主页(http://jasonding1354.gitcafe.io/)
    Github博客主页(http://jasonding1354.github.io/)
    CSDN博客(http://blog.csdn.net/jasonding1354)
    简书主页(http://www.jianshu.com/users/2bd9b48f6ea8/latest_articles)
    Google搜索jasonding1354进入我的博客主页

    相关文章

      网友评论

        本文标题:【Spark Core】任务执行机制和Task源码浅析1

        本文链接:https://www.haomeiwen.com/subject/asppqttx.html