美文网首页
spark job 执行逻辑

spark job 执行逻辑

作者: zachary_Luo | 来源:发表于2019-12-22 20:52 被阅读0次
    • 概述
      说明:代码为tag v3.0.0 preview2时master分支
      可以了解:
      1.依赖的构建。
      2.分区的计算。
      3.作业提交模型
      4.stage划分细节
      5.task分配细节
      6.spark处理数据模型
      ....
      最初目的是想了解spark具体是怎样读取数据。但是单一的去看textFile算子代码,发现只是相继new出两个对象hadoopRDD和MapPartitionsRDD,没有调用读数据的具体操作。
      然后为了一探究竟,以没有shuffle的简单3行代码为例, 分析其执行以及读数逻辑。
    val rddFile = sc.textFile("...")
    val rddMap = rdd.map(_.split(","))
    print(rddMap.count())
    
    • 代码概述:
      textFile会创建HadoopRDD,并初始化它的父类RDD类,然后调用父类的map方法并传入一个只取tuple._2 的函数f,map方法里将用一个新匿名函数来包装 函数f,用来构建MapPartitionsRDD,构建同时也会初始化其父类RDD类,且传入当前rdd用以标识依赖。然后会触发第二行代码map(_.split(","))。此时就会直接调用父类的map方法,传入函数自然就是我们写的split的函数,同样新匿名函数进行包装,构建新的MapPartitionsRDD,初始化父类Rdd,传入依赖rdd。
    • 关键代码:
    //RDD类
    abstract class RDD[T: ClassTag](
        @transient private var _sc: SparkContext,
        @transient private var deps: Seq[Dependency[_]]  
        //初始化时传入,还有一辅助构造函数用来将rdd转Seq[Dependency[_]]。
      ) 
    //RDD类中的map方法
    def map[U: ClassTag](f: T => U): RDD[U] = withScope {
        // clean方法实际上调用了ClosureCleaner的clean方法,旨在清除闭包中的不能序列化的变量,防止RDD在网络传输过程中反序列化失败[1]
        val cleanF = sc.clean(f)
        new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
      }
    //MapPartitionsRDD类
    private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
        var prev: RDD[T],
        f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
        preservesPartitioning: Boolean = false,
        isFromBarrier: Boolean = false,
        isOrderSensitive: Boolean = false)
    extends RDD[U](prev)
    
    

    说明:第一行代码执行完返回的是MapPartitionsRDD,内部其实还构建了一个HadoopRDD,MapPartitionsRDD的操作其实取value.toString。

    • print(rddMap.count())代码
      概述:
      1.count内部调用sc.runJob(this, Utils.getIteratorSize _).sum runJob函数会触发DagScheduler去分解任务并提交到TaskScheduler.
      2.参数this就是最后调用count操作的rdd,Utils.getIteratorSize一个工具类方法,该方法传入参数为迭代器,方法逻辑 进行迭代器累加,这里其实就是一个partition数据的累加。后面.sum就是每个分区数据sum。
      3.注意第二个参数后面有一下划线,是将工具类中定义的方法转为函数,因为函数才能作为参数传递,而方法是不行的。


      textFile.png
    • 分析runJob:
      1.计算分区。
      2.提交 job到DAGscheduler.

    • 1.计算分区,从当前rdd的检查点获取没有则从getPartitions计算,getPartitions会先获取到rdd的依赖,然后再从依赖的rdd中获取partition,也是从检查点或getPartitions计算。递归到第一个HadoopRdd没有依赖时,根据conf,最小分区数,底层数据文件个数以及大小等。获取到InputSplit,代表 hdfs 中的一份数据 ,就是一个 HadoopPartition。其定义了分割的长度及位置。分割长度 是指分割数据的大小(以字节为单位),而分割位置 是分割所在的机器结点名称组成的列表, 分割位置中就能获取到 数据所在的 host 和 rack。
      代码:这里获取到分区然后取一个range

    def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
        runJob(rdd, func, 0 until rdd.partitions.length)
      }
    
    rdd_partitions.png
    • 2.submit 到 DAGScheduler,先构建JobSubmitted(包括rdd,分区,分区处理函数等信息),提交到eventProcessLoop内的BlockingQueue。DAGScheduler在初始化时start了一个线程,用以死循环从loop中取事件,进行分事件类型处。并构建一个waiter,监听作业task完成执行resultHandler(将每个分区结果映射到数组)。
      postJob.png

    代码:

    sparkcontext 类
    def runJob[T, U: ClassTag](
          rdd: RDD[T],//map操作rdd
          func: (TaskContext, Iterator[T]) => U,//迭代器累加的函数
          partitions: Seq[Int],//partition数量
          //将分区结果res,映射到数组index位置匿名函数,
          //val results = new Array[U](partitions.size)
          //(index, res) => results(index) = res)
          resultHandler: (Int, U) => Unit): Unit = {
        val callSite = getCallSite//包含最靠近栈顶的用户类及最靠近栈底的Scala或Spark核心类信息
        val cleanedFunc = clean(func)
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
      }
    //DAGScheduler 类  submitJob在返回waiter
    def runJob[T, U](
          rdd: RDD[T],
          func: (TaskContext, Iterator[T]) => U,
          partitions: Seq[Int],
          callSite: CallSite,
          resultHandler: (Int, U) => Unit,
          properties: Properties): Unit = {
        val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
        ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
      }
    //构建JobSubmitted,并提交到事件循环处理器
    def submitJob[T, U](
          rdd: RDD[T],//split操作rdd
          func: (TaskContext, Iterator[T]) => U,//迭代器累加函数
          partitions: Seq[Int],//partition数量
          callSite: CallSite,
          resultHandler: (Int, U) => Unit,//结果处理句柄函数
          properties: Properties): JobWaiter[U] = {
        val waiter = new JobWaiter[U](this, jobId, partitions.size, resultHandler)
        eventProcessLoop.post(JobSubmitted(
          jobId, rdd, func2, partitions.toArray, callSite, waiter,
          Utils.cloneProperties(properties)))
        waiter
      }
    
    • 根据JobSubmitted事件,用handleJobSubmitted处理先划分stage,再按stage顺序依次提交到taskScheduler.

    划分stage,
    1.createResultStage创建finalStage,在创建finalStage时,调用getorCreate获取父stage,首先依次遍历当前rdd依赖,先找到rdd所有宽依赖,再遍历这些宽依赖。
    2.对宽依赖里的rdd继续深度遍历,找到当前rdd所有祖宗的宽依赖。
    3.遍历2中所有宽依赖准备创建stage,创建stage时会传入上一个stage。所以会根据当前rdd重新调用getorCreate。递归终止就是父stage为空返回,创建第一个stage.然后第一个stage返回创建第二个...。
    4.递归创建完后,会返回一个stage,然后根据最后一个rdd创建finalstage.

    说明:每个stage包含其父stage,包含宽依赖信息,分区信息。

    createStage.png

    代码

    //开始创建stage
     finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    //创建stage前,先获取父stage.
    private def createResultStage(...): ResultStage = {
        val parents = getOrCreateParentStages(rdd, jobId)
        val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
        stage
      }
    //1.先获取当前rdd直系父宽依赖,也就是他的爷爷宽依赖是获取不到的getShuffleDependencies。
    //2.遍历依赖获取stagegetOrCreateShuffleMapStage
    private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
        getShuffleDependencies(rdd).map { shuffleDep =>
          getOrCreateShuffleMapStage(shuffleDep, firstJobId)
        }.toList
      }
    //create依赖or Get。
    //1.create时先拿到所有祖宗依赖,指深度遍历,所有宽依赖子节点,都会拿到。并遍历所有宽依赖,创建stage.
    //2.根据当前宽依赖的rdd的所有祖宗依赖创建完后,会g根据当前宽依赖创建stage
    private def getOrCreateShuffleMapStage(...)ShuffleMapStage={
        shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
          case Some(stage) =>
            stage
          case None =>
    //拿到所有祖宗依赖,然后遍历
     getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
              if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
                createShuffleMapStage(dep, firstJobId)
              }
            }
            // Finally, create a stage for the given shuffle 
            //根据当前宽依赖创建stage
            dependency.createShuffleMapStage(shuffleDep, firstJobId)
        }
      }
    //创建stage,
    //1.拿到需要宽依赖中的rdd,调用getOrCreateParentStages获取父stage。然后创建ShuffleMapStage。
     def createShuffleMapStage(...): ShuffleMapStage = {
        val rdd = shuffleDep.rdd
        //先获取父stage,获取不到时,为空(第一个stage)
        val parents = getOrCreateParentStages(rdd, jobId)
     
        val stage = new ShuffleMapStage(
          id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
        stage
      }
    
    • 在handleJobSubmitted中创建完stage,然后更新各辅助变量,最后submitStage。提交过程也是递归提交,先递归到底找到没有父宽依赖的stage,进行提交。
    private def submitStage(stage: Stage): Unit = {
          //更具最后一个stage找打所有stage,并按id排序
          val missing = getMissingParentStages(stage).sortBy(_.id)
            //如果没有父stage,真正提交到taskScheduler.
            if (missing.isEmpty) {
              submitMissingTasks(stage, jobId.get)
            } else {
              //找到的父stage,再次递归调用submitStage,进行查找再提交
              for (parent <- missing) {
                submitStage(parent)
              }
            }
      }
    
    • submitMissingTasks 提交stage,会在DAGscheduler中将stage转为task,提交到TaskScheduler处理。
      1.计算task偏好位置,为stage内的每个分区计算.计算好的位置,将在后面用来分配task在哪执行。
      偏好位置为TaskLocation,一个trait,有三个实现类,代表数据存储在不同的位置:
      数据存储在Executor内存中,即Partition被cache到了内存中(返回executorId+host)
      数据存储在HDFS上(返回host)
      数据存储在host这个节点的磁盘上(返回“hdfs_cache”+host)
      rdd顺着窄依赖, 往上找父依赖, 直到找到第一个窄依赖, 也就是找数据读取源头。这里是HadoopRDD, 那么每个 task 处理的数据就是一个 HadoopPartition, 其代表 hdfs 中的一份数据 InputSplit, 定义了分割的长度及位置。
      读取的是 shuffle 的数据,那么会根据其shufflePartition去查找上个stage写出数据的位置。map写阶段根据分区器生成多少个分区。shufflePartition根据这多少个分区生成一个分区数组。
      2.先序列化stage和依赖信息,再构建task,最后用taskSet封装其stage所有task提交到taskScheduler
    //计算每个stage里每个分区的位置偏好
    val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
          stage match {
            case s: ShuffleMapStage =>
              partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
            case s: ResultStage =>
              partitionsToCompute.map { id =>
                val p = s.partitions(id)
                (id, getPreferredLocs(stage.rdd, p))
              }.toMap
          }
    //序列化stage中最后一个rdd,和依赖信息
    
    //构建task,为每个分区分别构建不同类型的task
    val tasks: Seq[Task[_]] = try {
          stage match {
            case stage: ShuffleMapStage =>
              partitionsToCompute.map { id =>           
                new ShuffleMapTask(stage.id, 
                  taskBinary, part, locs, properties,...)
              }
            case stage: ResultStage =>
              partitionsToCompute.map { id =>
                new ResultTask(stage.id,
                   taskBinary, part, locs, ....)
              }
          }
        } 
    //最后用taskSet封装提交到taskScheduler
    taskScheduler.submitTasks(new TaskSet(
            tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
    
    • 此时,已经到了taskScheduler.
      1.为当前TaskSet创建TaskSetManager,TaskSetManager负责管理一个taskSet的每一个task,管理task分配,重试,延迟调度等。(创建)
      2.当前taskSet添加到调度池中 ,调度池有两个实现FIFOSchedulerBuilder,FairSchedulerBuilder,并且默认是FIFO。


      taskManager.png

    关键代码:

    //TaskSchedulerImpl类
    override def submitTasks(taskSet: TaskSet): Unit = {
        val tasks = taskSet.tasks
         //创建TaskSet的Manager
          val manager = createTaskSetManager(taskSet, maxTaskFailures)
          //将Manager加入调度池。
          schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
        //调用CoarseGrainedSchedulerBackend的reviveOffers方法向driveEndPoint发送消息获取资源并计算task执行位置,最后LaunchTask
        backend.reviveOffers()
      }
    
    
    //TaskManager类
    //创建时会将taskSet里的task按偏好位置加入各pendingTask
      addPendingTasks()
      private def addPendingTasks(): Unit = {
          for (i <- (0 until numTasks).reverse) {
            addPendingTask(i, resolveRacks = false)
          }
      }
    
      private[spark] def addPendingTask(
          index: Int,
          resolveRacks: Boolean = true,
          speculatable: Boolean = false): Unit = {
        val pendingTaskSetToAddTo = pendingTask(包含各本地性级别的pendingtask)
        for (loc <- tasks(index).preferredLocations) {
          loc match {
            case e: ExecutorCacheTaskLocation =>
              //task的偏好位置有execId,将task的index加入forExecutor的pendingtask
              pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
            case e: HDFSCacheTaskLocation =>
             //HDFS里也会判断task位置偏好的host是否更封装的资源有一样的host,有则拿出host主机里的executor,去查看task里是否有对应execId,有则加入forExecutor的pendingtask
              val exe = sched.getExecutorsAliveOnHost(loc.host)
              exe match {
                case Some(set) =>
                  for (e <- set) {
                    pendingTaskSetToAddTo.forExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
            case _ =>
          }
          //遍历完task的偏好位置,会将所有task加入forHost的pendingtask。表示每个task都会有host本地性级别
          pendingTaskSetToAddTo.forHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
    
          //解析机架默认为false,将机架加入pendingtask
          if (resolveRacks) {
            sched.getRackForHost(loc.host).foreach { rack =>
              pendingTaskSetToAddTo.forRack.getOrElseUpdate(rack, new ArrayBuffer) += index
            }
          }
        }
        //task偏好位置为Nil在会加入noPrefs的pendingtask
        if (tasks(index).preferredLocations == Nil) {
          pendingTaskSetToAddTo.noPrefs += index
        }
       //会将所有task加入all的pendingtask。
        pendingTaskSetToAddTo.all += index
      }
    

    3.加入调度池后 backend.reviveOffers()调用了CoarseGrainedSchedulerBackend的reviveOffers方法向driveEndPoint发送消息获取资源(基于事件模型的调用,reviveOffers事件有两种触发模式:1.周期性触发的,默认1秒一次。2.被TaskSchedulerImpl里用backend.reviveOffers()调用)。触发后调用makeOffers(),a.先过滤出活跃的executor并封装成WorkerOffer(cores,host,execId,..)。b.然后根据resourceOffers按资源和task本地性找出最佳执行策略,返回Seq[TaskDescription]task的描述信息。最后交给SchedulerBackend发送task的描述信息到描述里的executor上执行

    //CoarseGrainedSchedulerBackend类
     private def makeOffers(): Unit = {
         val taskDescs = withLock {
            //过滤资源
            val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
           //整理成workOffers
            val workOffers = activeExecutors.map {
              case (id, executorData) =>
                new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
                  Some(executorData.executorAddress.hostPort),
                  executorData.resourcesInfo.map { case (rName, rInfo) =>
                    (rName, rInfo.availableAddrs.toBuffer)
                  })
            }.toIndexedSeq
           //找出要在哪些Worker上启动哪些task
            scheduler.resourceOffers(workOffers)
          }
          //对返回的taskDesc发送到对应Executor执行task
          if (taskDescs.nonEmpty) {
            launchTasks(taskDescs)
          }
        }
    
    //TaskScheduler类,查找task最佳资源
    //代码非常长,套用方法非常多,只显示核心逻辑,避免文章过长
    //提示:伪代码 
    遍历排序好的TaskSet,这里其实就是taskManager.
    for (taskSet <- sortedTaskSets) {
      再遍历taskSet里拥有的级别,从最优级别开始
      for (currentMaxLocality <- taskSet.myLocalityLevels) {
            var  launchedTask =false
            do {
                //找不到task执行资源为false
                //查找资源
                launchedTask = resourceOfferSingleTaskSet(taskSet,
                  currentMaxLocality, shuffledOffers,tasks,...)
              } while (launchedTask)
    }
    
    private def resourceOfferSingleTaskSet(....){
        //遍历每个workOffer(资源)
        for (i <- 0 until shuffledOffers.size) {
          //空闲资源大于task执行需要的资源
          if (availableCpus(i) >= CPUS_PER_TASK){
              //resourceOffer方法为去查找最佳task执行位置,返回类型Option[taskDesc]
              for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
                 //各种信息更新
              }
          }
       }
     return launchedTask
    }
    
    def resourceOffer(
          execId: String,
          host: String,
          maxLocality: TaskLocality.TaskLocality,
          availableResources: Map[String, Seq[String]] = Map.empty)
        : Option[TaskDescription] =
      {
          //遍历的最优数据本地性不为NO_PREF,计算一个允许的最低本地性级别
          if (maxLocality != TaskLocality.NO_PREF) {
            allowedLocality = getAllowedLocalityLevel(curTime)
            if (allowedLocality > maxLocality) {
              // We're not allowed to search for farther-away tasks
              allowedLocality = maxLocality
            }
          }
      //dequeueTask查找task的index,返回类型Option[int]
      dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
      //找到的task,进行封装taskDesc,将资源的地址等等信息封装。并返回
    return TaskDesc
      }
    }
    
     private def dequeueTask(
          execId: String,
          host: String,
          maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] = {
        //pendingTask
        val pendingTaskSetToUse = if (speculative) pendingSpeculatableTasks else pendingTasks
        //dequeue方法主要是dequeueTaskFromList从  pendingTask取出task的Index,返回类型option[int]
        def dequeue(list: ArrayBuffer[Int]): Option[Int] = {
          val task = dequeueTaskFromList(execId, host, list, speculative)
          if (speculative && task.isDefined) {
            speculatableTasks -= task.get
          }
          task
        }
      //最先默认从forExecutor根据资源的execId查找task
     dequeue(pendingTaskSetToUse.forExecutor.getOrElse(execId, ArrayBuffer())).foreach { index =>
          return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
        }
        //比较允许的最低级别大于Node_local级别,通过主机名找到相应的Task
        if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {
          dequeue(pendingTaskSetToUse.forHost.getOrElse(host, ArrayBuffer())).foreach { index =>
            return Some((index, TaskLocality.NODE_LOCAL, speculative))
          }
        }
    
        //node_local之后,会比较允许的最低级别大于NO_PREF级别,通过noPrefs去pendingTask查找
        if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {
          dequeue(pendingTaskSetToUse.noPrefs).foreach { index =>
            return Some((index, TaskLocality.PROCESS_LOCAL, speculative))
          }
        }
        //NO_PREF之后,会比较允许的最低级别大于RACK_LOCAL级别,通过rack去forRack pendingTask查找
        if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {
          for {
            rack <- sched.getRackForHost(host)
            index <- dequeue(pendingTaskSetToUse.forRack.getOrElse(rack, ArrayBuffer()))
          } {
            return Some((index, TaskLocality.RACK_LOCAL, speculative))
          }
        }
        //最后 去ANY pendingTask查找
        if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {
          dequeue(pendingTaskSetToUse.all).foreach { index =>
            return Some((index, TaskLocality.ANY, speculative))
          }
        }
        None
    }
    
    task_arrange.png
    • launchTasks
    • makeOffers方法将task分配资源后,调用launchTasks,发送到指定的executor执行task,先对taskDesc消息序列化,可以在网络间进行传输。再获取exector信息,然后发送LaunchTask消息执行task
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
      //首先对每个executor需要执行的task消息序列化一下,可以在网络间进行传输
        val serializedTask = TaskDescription.encode(task)
        if (serializedTask.limit >= maxRpcMessageSize) {
        //根据task消息中的executorId找到运行的executor
          val executorData = executorDataMap(task.executorId)
          //并将executor空余的core数减去自身需要的core数
          executorData.freeCores -= scheduler.CPUS_PER_TASK
           //向executor发送LaunchTask消息,用于在对应executor上启动task
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }
    
    • Executor端接收LaunchTask事件
      driver端向executor发送任务需要通过后台辅助进程CoarseGrainedSchedulerBackend,那么自然而然executor接收任务也有对应的后台辅助进程CoarseGrainedExecutorBackend,该进程与executor一一对应,提供了executor和driver通讯的功能。下面看CoarseGrainedExecutorBackend接收到事件后是如何处理的:
      1.将TaskDescription反序列化
      2.调用executor的launchTask执行task
    case LaunchTask(data) =>
          if (executor == null) {
            exitExecutor(1, "Received LaunchTask command but executor was null")
          } else {
            // 将TaskDescription反序列化
            val taskDesc = ser.deserialize[TaskDescription](data.value)
            // 调用executor的launchTask来加载该task
            executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
              taskDesc.name, taskDesc.serializedTask)
          }
    
    • executor接受task,创建了一个TaskRunner(继承于 Runnable)并加入到线程池中执行。其run方法:反序列化得到task各信息,然后调用task的run方法,根据task的类型(shuffle,result)真正执行task,执行完后。清除分配内存,然后序列化task的结果,包装成directResult,再次序列化,根据其结果大小将结果以不同的方式返回给driver
    def launchTask(
         context: ExecutorBackend,
         taskId: Long,
         attemptNumber: Int,
         taskName: String,
         serializedTask: ByteBuffer): Unit = {
       // 创建一个TaskRunner
       val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
         serializedTask)
       runningTasks.put(taskId, tr)
       // 将tr放到线程池中执行
       threadPool.execute(tr)
     }
    
    • ShuffleMapTask:
      shuffleMapTask的输出直接通过Shuffle write写磁盘,为下游的stage的Shuffle Read准备数据
      反序列化出rdd和ShuffleDependency,获取ShuffleManager的writer将一个rdd的某个分区写入到磁盘,通过rdd的iterator方法能遍历对应分区的数据。
    override def runTask(context: TaskContext): MapStatus = {
        // 反序列化出rdd和ShuffleDependency
        val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
          ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
        _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
    
        var writer: ShuffleWriter[Any, Any] = null
        try {
          // 获取shuffleManager
          val manager = SparkEnv.get.shuffleManager
          // 通过shuffleManager的getWriter()方法,获得shuffle的writer  
          writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
          // 通过rdd指定分区的迭代器iterator方法来遍历每一条数据,再之上再调用writer方法以写数据
          writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
        } 
      }
    
    • ResultTask:
      1.反序列化得到rdd和func(count操作传进来的迭代器累加匿名函数),执行func,传入rdd调用iterator方法获取到数据的迭代器。func对迭代器累计。
    override def runTask(context: TaskContext): U = { 
       val deserializeStartTime = System.currentTimeMillis()
       val ser = SparkEnv.get.closureSerializer.newInstance()
       // 反序列化
       val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
         ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
       _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
       // 对rdd的指定分区的迭代器执行func函数,并返回结果
       func(context, rdd.iterator(partition, context))
     }
    
    • shuffleTask 后序再将shuffle的时候再补充,这里主要讲ResultMaptask。关键一行代码 func(context, rdd.iterator(partition, context))
      rdd调用iterator获取该分区的迭代器,以用来执行最后的func。
      如何获取的迭代器:
      判断rdd是否缓存,checkPoint,没有则依次从最后一个rdd向上调用compute获取依赖。在我们的测试例子中,最后一个是split操作的MapPartitionRDD,所以先调用此rdd的compute
    //MapPartitionRDD
    override def compute(split: Partition, context: TaskContext): Iterator[U] =
       f(context, split.index, firstParent[T].iterator(split, context))
    

    看代码:compute函数是调用f函数获取迭代器,f函数是一个匿名函数。在这就是split操作。也就是说获取的迭代器在此做一个split操作,在返回。
    然后继续看 firstParent[T].iterator。调用上一个rdd的iterator方法获取迭代器,其实就是跟刚刚一样了。直到获取到第一个HadoopRDD调用compute方法计算当前partition的Iterator.。

    • HadoopRDD compute
      创建一个迭代器,内部用inputFormat和分区构造一个reader,利用reader重写迭代器next方法用以读取数据。hadoopRDD默认的inputFormat是FileinputFormat将数据读成分割成一行.reader是LineRecordReader,key为偏移量,一行为value。
      最后返回数据的迭代器。
    override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
        val iter = new NextIterator[(K, V)] {
           //将compute的输入theSplit,转换为HadoopPartition
          private val split = theSplit.asInstanceOf[HadoopPartition]
          logInfo("Input split: " + split.inputSplit)
          private val jobConf = getJobConf()
          ...
          //创建reader
          private var reader: RecordReader[K, V] = null
          //先根据conf拿到InputFormat,
          private val inputFormat = getInputFormat(jobConf)
          //从InputFormat中getRecordReader,传入HadoopPartition,conf。
          reader =
            try {
              inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)
            } catch {...  }
          ...
          private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
          private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()
          //重写next方法,用以遍历数据
          override def getNext(): (K, V) = {
            try {
              finished = !reader.next(key, value)
            } catch {...        }
            if (!finished) {
              inputMetrics.incRecordsRead(1)
            }
            (key, value)
          }
        //最后构建一个包装好的迭代器,传入根据reader读数,并重写了next方法的迭代器。
        new InterruptibleIterator[(K, V)](context, iter)
      }
    
    • 迭代器,返回至task执行时func(context, rdd.iterator(partition, context)),执行func函数,对迭代器里的数据遍历累加,再从第一个rdd的迭代器遍历出数据时,也会作用在后面rdd的f函数上,也就是用户编写的操作。
      func 代码:在count提交作业时,传入
    //count提交作业,传入Utils.getIteratorSize _ 对没每个分区的计算。
    def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
    
    //func函数 就是累加迭代器,触发整个数据操作。
    def getIteratorSize(iterator: Iterator[_]): Long = {
       var count = 0L
       while (iterator.hasNext) {
         count += 1L
         iterator.next()
       }
       count
     }
    

    总流程:概括


    all_operator.png

    相关文章

      网友评论

          本文标题:spark job 执行逻辑

          本文链接:https://www.haomeiwen.com/subject/axwzgctx.html