美文网首页大数据
spark源码分析(2)

spark源码分析(2)

作者: mainroot | 来源:发表于2018-11-14 17:06 被阅读0次

    二、spark程序的结构

    当用户提交一个saprk程序时,用户在创类SparkContext实例时,在不同部署模式下,均会最终在集群各节点上,启动java虚拟机, 通过调用CoarseGrainedExecutorBackend类的main方法启动executor,excutor会启动线程池,侦听网络端串,接收串化代码后,并启动线程执行代码。

    所以,spark程序包含二部分:

    • 用户的spark程序,形式是用户提供的一个jar包
    • executor的启动程序,main函数在CoarseGrainedExecutorBackend.scala中, spark-assembly.jar中包含。

    1.CoarseGrainedExecutorBackend类启动分析

    我们首先分析 CoarseGrainedExecutorBackend类,由于内容比较简单,直接追踪代码即可,以下做简要分析

    1. main 解析参数,调用run
    2. run 建立SparkEnv.createExecutorEnv
    3. rpcEnv.setupEndpoint 设定 Endpoint
        env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(...))
    4. 调用 env.rpcEnv.awaitTermination()阻塞进程
    5. createExecutorEnv 里调用 RpcEnv.create 建立  rpcEnv
    6. RpcEnv支持二种串化库 akka netty 配置如下:
       "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory"
       "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory"  默认
    7. NettyRpcEnv 建立 NettyRpcEnv建立ipcEnv
    8.setupEndpoint中,覆盖receive
    9.receive 对启动任务的处理方法
        建立Executor : case RegisteredExecutor(hostname) => executor = new Executor(...) 
        接收任务并启动: case LaunchTask(data) => executor.launchTask(...)
    10. Executor.launchTask
      def launchTask(
          context: ExecutorBackend,
          taskId: Long,
          attemptNumber: Int,
          taskName: String,
          serializedTask: ByteBuffer): Unit = {
        val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName, serializedTask)
        runningTasks.put(taskId, tr)
        threadPool.execute(tr)
      }                             
    

    2. 用户应用程序中的任务调度

    这里主要分析基于Yarn Cluster模式部分的源码

    主要是 CoarseGrainedSchedulerBackend、YarnClusterScheduler 类用于任务调度

    1.createSparkEnv 中会建立任务调度器  
    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
    
    2.createTaskScheduler中,分析 yarn模式部分的代码
      case "yarn-standalone" | "yarn-cluster" =>
            val scheduler = try {
              val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
              val cons = clazz.getConstructor(classOf[SparkContext])
              cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]
            } catch {...}
            val backend = try {
              val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend")
              val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
              cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
            } catch {...}
            scheduler.initialize(backend)
            (backend, scheduler)
    

    用反射方法,生成实例,并最终调度器调用initialize初始化函数

    3.YarnClusterScheduler 继承自 TaskSchedulerImpl 中定义了initialize初始化方法,定义一个任务池
     def initialize(backend: SchedulerBackend) {
        this.backend = backend                                                                                                                                                   
        rootPool = new Pool("", schedulingMode, 0, 0)
        schedulableBuilder = {
          schedulingMode match {
            case SchedulingMode.FIFO =>
              new FIFOSchedulableBuilder(rootPool)
            case SchedulingMode.FAIR =>
              new FairSchedulableBuilder(rootPool, conf)
          }
        }
        schedulableBuilder.buildPools()
      }
    
    4. SparkContext 中启动YarnClusterScheduler实例的start方法
      _taskScheduler.start() 
    
    5. YarnClusterScheduler 的start方法继承自 TaskSchedulerImpl
     override def start() {
        //YarnClusterSchedulerBackend的初始化                                                                                                                                                     
        backend.start() 
         }
    
    6.YarnClusterSchedulerBackend 继承 YarnSchedulerBackend 
    override def start() {
        super.start()                                                                                                                                                            
        totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
      }
    
    7. YarnSchedulerBackend 继承 CoarseGrainedSchedulerBackend
        val ENDPOINT_NAME = "CoarseGrainedScheduler"
        override def start() {
            val properties = new ArrayBuffer[(String, String)]
            for ((key, value) <- scheduler.sc.conf.getAll) {
                if (key.startsWith("spark.")) {                                                                                                                                        
                    properties += ((key, value))
                }
            }
                    //建立一个driverEndpoint 用于发送接收消息
            driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
                           createDriverEndpoint(properties))
        }
        protected def createDriverEndpoint(
                    properties: Seq[(String, String)]
            ) : DriverEndpoint = {
            new DriverEndpoint(rpcEnv, properties)                                                                                                                                   
        }
    
    8.  DriverEndpoint 类中有收消息函数
        override def receive: PartialFunction[Any, Unit] = {
    
            case StatusUpdate(executorId, taskId, state, data) =>
            scheduler.statusUpdate(taskId, state, data.value)
            if (TaskState.isFinished(state)) {
              executorDataMap.get(executorId) match {
                case Some(executorInfo) =>
                  executorInfo.freeCores += scheduler.CPUS_PER_TASK
                  makeOffers(executorId)            
                }
            }
          case ReviveOffers =>
            makeOffers()
          case KillTask(taskId, executorId, interruptThread) =>
            executorDataMap.get(executorId) match {
              case Some(executorInfo) =>
                executorInfo.executorEndpoint.send(
                        KillTask(taskId, executorId, interruptThread))
            }
        }
    

    其中executorDataMap存放着所有可用的executor

    9. makeOffers 
        private def makeOffers() {
          val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
          val workOffers = activeExecutors.map { case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
          }.toSeq
          launchTasks(scheduler.resourceOffers(workOffers))
        }   
        private def makeOffers(executorId: String) {
          if (executorIsAlive(executorId)) {
            val executorData = executorDataMap(executorId)
            val workOffers = Seq(
              new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))
            launchTasks(scheduler.resourceOffers(workOffers))
          }
        }
    

    后面,会重点关注以下几个函数:

    • scheduler.resourceOffers 资源任务绑定
    • WorkerOffer 对计算资源的简单封装
    • launchTask 启动任务函数
    10. WorkerOffer 简单封装
      case class WorkerOffer(executorId: String, host: String, cores: Int)
    
    11. scheduler.resourceOffers(workOffers)
     //在类core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala中定义
      def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
          for (o <- offers) {
          executorIdToHost(o.executorId) = o.host
          executorIdToRunningTaskIds.getOrElseUpdate(o.executorId, HashSet[Long]())
    
        }
         //打乱excetor的顺序,随机分配任务
        val shuffledOffers = Random.shuffle(offers)
       //每个excetor有一个task数组,初始化大小为cores数组。 只是分配的内存,并加入任务描述
        val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
        //每个executor可用核数的数组
        val availableCpus = shuffledOffers.map(o => o.cores).toArray 
        val sortedTaskSets = rootPool.getSortedTaskSetQueue //取任务集
        for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
          do {
            launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
          } while (launchedTask)
        }
        if (tasks.size > 0) {
          hasLaunchedTask = true
        }                             
        return tasks
      }
    
    12.resourceOfferSingleTaskSe 任务绑定
     private def resourceOfferSingleTaskSet(
          taskSet: TaskSetManager,
          maxLocality: TaskLocality,
          shuffledOffers: Seq[WorkerOffer],
          availableCpus: Array[Int],
          tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
        var launchedTask = false
        for (i <- 0 until shuffledOffers.size) {
          val execId = shuffledOffers(i).executorId
          val host = shuffledOffers(i).host
         //确定该executor的能力是否满足任务要求
          if (availableCpus(i) >= CPUS_PER_TASK) { 
            try {
              for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
                tasks(i) += task  // 向每个executor的数组里添加任务
                val tid = task.taskId
                taskIdToTaskSetManager(tid) = taskSet
                taskIdToExecutorId(tid) = execId
                executorIdToRunningTaskIds(execId).add(tid)
                executorsByHost(host) += execId
                availableCpus(i) -= CPUS_PER_TASK  //CPU能力减弱
                launchedTask = true
              }
            } catch {... }
        }
        return launchedTask
      }
    

    由此函数可知,--executor-cores的配置在CPUS_PER_TASK为1情况下,就是executor可并行执行的线程数

    13. 该函数从任务队例中取任务,生成任务信息
     def resourceOffer(  execId: String,   host: String,    maxLocality: TaskLocality.TaskLocality)
        : Option[TaskDescription] =
      {
        if (!isZombie) {
          val curTime = clock.getTimeMillis()
          var allowedLocality = maxLocality
          if (maxLocality != TaskLocality.NO_PREF) {
            allowedLocality = getAllowedLocalityLevel(curTime)
            if (allowedLocality > maxLocality) {
              allowedLocality = maxLocality
            }
          }
          dequeueTask(execId, host, allowedLocality) match {
            case Some((index, taskLocality, speculative)) => {
              val task = tasks(index)
              val taskId = sched.newTaskId()
              copiesRunning(index) += 1
              val attemptNum = taskAttempts(index).size
              val info = new TaskInfo(taskId, index, attemptNum, curTime,
                execId, host, taskLocality, speculative)
              taskInfos(taskId) = info
              taskAttempts(index) = info :: taskAttempts(index)
              if (maxLocality != TaskLocality.NO_PREF) {
                currentLocalityIndex = getLocalityIndex(taskLocality)
                lastLaunchTime = curTime
              }
              val startTime = clock.getTimeMillis()
              val serializedTask: ByteBuffer = try {
                Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
              } catch {... }
     if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
                  !emittedTaskSizeWarning) {
                emittedTaskSizeWarning = true
              }
              addRunningTask(taskId)
              sched.dagScheduler.taskStarted(task, info)
              return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
                taskName, index, serializedTask))
            }
            case _ =>
          }
        }
        None
      }
    
    14.launchTasks 发送任务
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
          for (task <- tasks.flatten) {
            val serializedTask = ser.serialize(task)
            val executorData = executorDataMap(task.executorId)
            executorData.freeCores -= scheduler.CPUS_PER_TASK
            executorData.executorEndpoint.send(
                  LaunchTask(
                        new SerializableBuffer(serializedTask)
                   )
            )
        }
    

    以上代码用 executorData.executorEndpoint.send 把任务发给接收端

    相关文章

      网友评论

        本文标题:spark源码分析(2)

        本文链接:https://www.haomeiwen.com/subject/hgujfqtx.html