spark底层源码解析:

作者: Yellow_0ce3 | 来源:发表于2019-01-09 16:38 被阅读67次

    这是本人第一次发表技术帖,借鉴了很多大神的文章和自己的一些拙见,有什么不正确的大家可以指出来,共同进步

    Spark底层RPC通信:记住这里是以事件进行驱动的!!!!
    三个主要的类:

    RpcEndpoint:是一个通信端,例如Spark集群中的Master,或Worker,都是一个RpcEndpoint.

    RpcEndpointRef:RPCEndPoint的引用,我们想要和RPCEndpoint通信的话就必须要获得它的引用.

    RpcEnv:是RPC通信的框架和环境,有RPC的启动,停止,关闭等方法,它有一个setupEndPoint方法,用来注册一个RPCEndPoint,同时将RpcEndpointRef和RpcEndpoint以键值对的形式存放在线程安全的ConcurrentHashMap里面

    spark的启动消息通信

    (代码不会精细的分析,只会提一下比较重要的方法,和主要实现的功能)

    主要的组件:

    • Master
    • Worker
    1. master端:
    def startRpcEnvAndEndpoint(
          host: String,
          port: Int,
          webUiPort: Int,
          conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
        val securityMgr = new SecurityManager(conf)
        val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
        #val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
          new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))#
        val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
        (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
      }
    } 
    

    在master方法的main方法里面首先就是去调用上面的方法,开始构建通信环境,通过RPCEnv获取的自己的RPCEndpoinRef,这是master的启动

    1. worker端:
    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
        masterRpcAddresses.map { masterAddress =>
          registerMasterThreadPool.submit(new Runnable {
            override def run(): Unit = {
              try {
                logInfo("Connecting to master " + masterAddress + "...")
                //通过master的信息生成MasterRpcEndPoint  像master发送注册信息
                val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
                //这里就是发送注册信息的代码
                sendRegisterMessageToMaster(masterEndpoint)
              } catch {
                case ie: InterruptedException => // Cancelled
                case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
              }
            }
          })
        }
      }
    

    在worker端的 tryRegisterAllMasters()方法会尝试向Masters(因为可能会配置HA)进行注册
    我们之前提到过要向RpcEndpoint通信必须要获取到它的RpcEndpointRef,所以
    val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)这个方法就是获取master的RpcEndpointRef接着通过* sendRegisterMessageToMaster(masterEndpoint)这个方法发送消息在spark底层通信大量用到的模式匹配类进行的通信*

    之后要是代码里面方法有详细的注释的话就不会单独的拿出来讲了,只是会提一下应该注意的点,同时这里的1,2,3就是代码分析执行的流程,比如接下来就到master端了

    3.master端

    case RegisterWorker(
          id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
          logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
            workerHost, workerPort, cores, Utils.megabytesToString(memory)))
          //判断各种状态
          if (state == RecoveryState.STANDBY) {
            workerRef.send(MasterInStandby)
          } else if (idToWorker.contains(id)) {
            //发送的是注册失败的消息
            workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
          } else {
            val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
              workerRef, workerWebUiUrl)
            //把这个worker对象里面的一系列信息保存到自己的内部,返回值是一个Boolean类型的 避免重复注册
            if (registerWorker(worker)) {
              persistenceEngine.addWorker(worker)
              //像worker返回一个注册成功的消息 这里的self就相当于 自身的RpcEndPoint 
              workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
              schedule()
            } else {
              val workerAddress = worker.endpoint.address
              //打印日志信息
              logWarning("Worker registration failed. Attempted to re-register worker at same " +
                "address: " + workerAddress)
              //发送失败信息
              workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
                + workerAddress))
            }
          }
    

    这段代码主要就是master端对woker端发送过来的注册消息进行处理的逻辑.registerWorker(worker)这个方法会将worker信息加入到自己内部的列表里面后面会用于集群的任务调度.
    4.回到了woker端

     case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
            if (preferConfiguredMasterAddress) {
              logInfo("Successfully registered with master " + masterAddress.toSparkURL)
            } else {
              logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
            }
            //改变注册状态
            registered = true
            //同时更新自己内部维护的master的信息
            changeMaster(masterRef, masterWebUiUrl, masterAddress)
            forwordMessageScheduler.scheduleAtFixedRate(
             
              new Runnable {
              override def run(): Unit = Utils.tryLogNonFatalError {
                //这里是自己给自己发送消息,请求向master发送心跳信息
                self.send(SendHeartbeat)
              }
            }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
            //又创建了一个线程 去执行删除目录的任务
            if (CLEANUP_ENABLED) {
              logInfo(
                s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
              forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
                override def run(): Unit = Utils.tryLogNonFatalError {
                  self.send(WorkDirCleanup)
                }
              }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
            }
           //执行关于executor的代码
            val execs = executors.values.map { e =>
              new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
            }
            //发送一些任务信息
            masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
    

    上面一些这里暂时不关注的方法给了简单的注释,这里就不再解释了,我们这里关注的是* self.send(SendHeartbeat)*这个方法,我们再往里面走!

     case SendHeartbeat =>
          if (connected) { sendToMaster(Heartbeat(workerId, self)) }
    

    这里就开始真正的向master发送"心跳"所谓的"心跳机制"就是每隔一段时间(这个时间间隔我们可以自己设置)向master返回自己最新的状态信息,而master端的处理我们看下面的代码
    5.master端

        case Heartbeat(workerId, worker) =>
          idToWorker.get(workerId) match {
            case Some(workerInfo) =>
              //更新最后一次心跳时间
              workerInfo.lastHeartbeat = System.currentTimeMillis()
            case None =>
              //这下面就是打印些错误信息
              if (workers.map(_.id).contains(workerId)) {
                logWarning(s"Got heartbeat from unregistered worker $workerId." +
                  " Asking it to re-register.")
                worker.send(ReconnectWorker(masterUrl))
              } else {
                logWarning(s"Got heartbeat from unregistered worker $workerId." +
                  " This worker was never registered, so ignoring the heartbeat.")
              }
          }
    

    workerInfo.lastHeartbeat = System.currentTimeMillis()这段代码很重要!!它会通过发送过来wokerId去更新之前注册在里面的workInfo信息中的lastHeartbeat, 而这里它是怎么做到检测的呢,原因是master它自己有一个超时检测机制,会在我们设置的时间的间隔内去检测workInfo列表的最后一次心跳时间,要是超过我们设置的时间就代表这个worker已经"挂掉了",这里超时检测机制的代码就不去看了,有兴趣的小伙伴可以自己去看看,在此spark的启动消息通信就已经分析完了,这里只是简单的分析,并且借鉴了很多大神的资料.有问题请小伙伴及时纠正哦

    Spark运行时的消息通信

    牵扯到的几个主要的RPCEndPoint

    1. Worker :
      Executor :
    2. Master :
    下面两者在SparkContext中创建出来的
    1. DriverEndpoint :负责和executor进行通信 ,真正创建的是CoarseGrainedScheduler的EndPointRef
      在创建 CoarseGrainedSchedulerBackend中创建
      ENDPOINT_NAME的值是"CoarseGrainedScheduler"
     rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
    
    1. ClientEndpoint : 负责和master进行通信,真正 创建的是StandaloneAppClient的EndPointRef.但是它到Master端的变量名为driver (我也很困惑...为什么这样叫)
      在StandaloneAppClient中创建:
        // Just launch an rpcEndpoint; it will call back into the listener.
        endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
      }
    

    这块在spark中的命名真的有点懵逼.....

    简单的流程
    当用户提交程序的时候,SparkContext会向Master发送注册消息接着Master通知Woker启动Executor执行这个App,当executor启动成功就会,executor会向Sparkcontext的DriverEndPoint进行通信,向SparkContext进行注册,接着Rdd的Action算子触发,将创建RDD的DAG,通过DAGSheduler进行stage的划分,接着生成TaskSet,有TaskSheduler像Executor发送执行消息

    首先明确一个点: 在我们提交APP的时候SparkContext会做哪些事情?(在这里是将他们进行了初始化,当Action算子的执行的时候他们才会真正的执行,这一点要记住哦!)

    1. 创建SparkEvn: 用来创建DriverEndpoint和ClientEndpoint.
    2. 创建TaskScheduler:
    3. 创建DAGScheduler:

    1.SparkContext端:

    一:创建SparkEnv:

    这里就是创建RPCEnv的
      private[spark] def createSparkEnv(
          conf: SparkConf,
          isLocal: Boolean,
          listenerBus: LiveListenerBus): SparkEnv = {
        SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
      }
    

    在段代码创建了SparkEnv(在它里面有个属性private[spark] val rpcEnv: RpcEnv),有了它之后我们就可以创建我们上面提到的在SparkContext端的两个RPCEndpoint对象了.

    二:创建TaskScheduler和DAGScheduler以及后端调度器

      // Create and start the scheduler
     //返回一个后端调度器和taskScheduler
        val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
        _schedulerBackend = sched
        _taskScheduler = ts
        _dagScheduler = new DAGScheduler(this)
        _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
    
        // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
        // 开始执行
        _taskScheduler.start()
    

    这句代码SparkContext.createTaskScheduler(this, master, deployMode)就是创建TaskSheduler的入口,它会返回一个后端调度器和taskScheduler
    _dagScheduler = new DAGScheduler(this)这里是用SparkContext去创建DAGScheduler对象
    我们重点关注的是通信所以我们现在要知道怎么创建DriverEndpoint和ClientEndpoint.它们是在后端调度器的启动方法里面会创建这个两个对象
    SparkContext.createTaskScheduler(this, master, deployMode)这个方法往里走:

     //创建后端调度器,在new后端调度器的同时会去创造DriverEndp和ClientEndPoint
            val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
            scheduler.initialize(backend)
            backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
              localCluster.stop()
            }
            (backend, scheduler)
    

    再往后端调度器里面看,这个时候就来到了StandaloneSchedulerBackend这个类它继承了"CoarseGrainedSchedulerBackend"在它的start()方法里面调用了父类的start()方法,在CoarseGrainedSchedulerBackend的start方法里面

      // TODO (prashant) send conf instead of properties
      //创建DriverEndpoint
        driverEndpoint = createDriverEndpointRef(properties)
    

    接着在StandaloneSchedulerBackend这个类里面创建StandaloneAppClient的客户端

        /**
          * 创建StandaloneAppClient的客户端  同时启动
          */
        client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
        client.start()
    
    

    创建StandaloneAppClient的客户端所以这里我们的ClientEndpoint是StandaloneApp模式的

    三:StandaloneAppClient和Master端进行通信

    1. SparkContext端standaloneAppClient这个类里面
        private def tryRegisterAllMasters(): Array[JFuture[_]] = {
          for (masterAddress <- masterRpcAddresses) yield {
            //在线程池里面启动向Master注册的消息
            registerMasterThreadPool.submit(new Runnable {
              override def run(): Unit = try {
                if (registered.get) {
                  return
                }
                logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
                val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
                masterRef.send(RegisterApplication(appDescription, self))
              } catch {
                case ie: InterruptedException => // Cancelled
                case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
              }
            })
          }
        }
    

    这段代码就类似与worker向master注册这里就不再赘述了

    1. Master端
        case RegisterApplication(description, driver) =>
          // TODO Prevent repeated registrations from some driver
          if (state == RecoveryState.STANDBY) {
            // ignore, don't send response
          } else {
            logInfo("Registering app " + description.name)
            //把客户端的信息保存到下来
            val app = createApplication(description, driver)
            registerApplication(app)
            logInfo("Registered app " + description.name + " with ID " + app.id)
            persistenceEngine.addApplication(app)
            driver.send(RegisteredApplication(app.id, self))
            /**
              *  这个方法很重要!!!表示要开始资源调度了!!!!!
              */
            schedule()
          }
    

    master端收到消息后将信息保存下来,同时执行一个很重要的方法schedule,它表示master要开始资源调度了!!!!!!!!!!!我们现在只关注它里面的一个主要的方法startExecutorsOnWorkers()这个方法用来通知Worker启动executor

      private def startExecutorsOnWorkers(): Unit = {
        // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
        // in the queue, then the second app, etc.
        for (app <- waitingApps if app.coresLeft > 0) {
          //获取的是任务执行的时候,分配在executor上core数
          val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
          // Filter out workers that don't have enough resources to launch an executor
          //找出能够执行程序的worker 接着对它们进行排序
          val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
            .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
              worker.coresFree >= coresPerExecutor.getOrElse(1))
            .sortBy(_.coresFree).reverse
    
          //获得被分配的核数
          val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
    
          // 现在我们已经决定在每个worker上分配多少个内核,让我们来分配它们
          for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
            //通知worker启动executor
            allocateWorkerResourceToExecutors(
              app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
          }
        }
      }
    

    allocateWorkerResourceToExecutors(
    app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
    这个方法里面会去调用一个launchExecutor(worker, exec)方法,同时将 manager上面app的状态改成RUNNING
    launchExecutor方法

     private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
        logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
        worker.addExecutor(exec)
        //向wroker端发送请求
        worker.endpoint.send(LaunchExecutor(masterUrl,
          exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
        //向diver端返回消息executor的信息
        exec.application.driver.send(
          ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
      }
    

    这里主要做了两件事

    1. 通知Worker启动executor 同时把executor的基本信息发送过去
    2. 同时将这些基本信息返回给driver端
    在Worker主要做了什么事情呢
    1. 创建工作的目录
    2. 实例化一个ExecutorRunner
    3. 在ExecutorRunner中通过comm(它之前在SchedulerdBackend中构建的comm接着通过上述的StandaloneAppClient发送给Master再发送给Worker的)
      去创建CoarseGrainedExecutorBackend,它就牛逼了,是运行executor的容器,并且还是负责和driverEndpoint进行通信的.
    4. Worker发送executorStateChanged消息给Master通知创建完毕.
    接下来我们来看代码
      case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
          if (masterUrl != activeMasterUrl) {
            logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
          } else {
            try {
              logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
    
              // Create the executor's working directory
              //创建一个工作目录
              val executorDir = new File(workDir, appId + "/" + execId)
              if (!executorDir.mkdirs()) {
                throw new IOException("Failed to create directory " + executorDir)
              }
    
              // Create local dirs for the executor. These are passed to the executor via the
              // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
              // application finishes.
              //这个本地的工作目录会在程序执行完了之后由worker删除
              val appLocalDirs = appDirectories.getOrElse(appId, {
                val localRootDirs = Utils.getOrCreateLocalRootDirs(conf)
                val dirs = localRootDirs.flatMap { dir =>
                  try {
                    val appDir = Utils.createDirectory(dir, namePrefix = "executor")
                    Utils.chmod700(appDir)
                    Some(appDir.getAbsolutePath())
                  } catch {
                    case e: IOException =>
                      logWarning(s"${e.getMessage}. Ignoring this directory.")
                      None
                  }
                }.toSeq
                if (dirs.isEmpty) {
                  throw new IOException("No subfolder can be created in " +
                    s"${localRootDirs.mkString(",")}.")
                }
                dirs
              })
              appDirectories(appId) = appLocalDirs
              //实例化executorRunner对象
              val manager = new ExecutorRunner(
                appId,
                execId,
                appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
                cores_,
                memory_,
                self,
                workerId,
                host,
                webUi.boundPort,
                publicAddress,
                sparkHome,
                executorDir,
                workerUri,
                conf,
                appLocalDirs, ExecutorState.RUNNING)
              executors(appId + "/" + execId) = manager
    
              //开始启动 
              manager.start()
    
              coresUsed += cores_
              memoryUsed += memory_
              //告诉master executor创建完毕
              sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
            } catch {
              case e: Exception =>
                logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
                if (executors.contains(appId + "/" + execId)) {
                  executors(appId + "/" + execId).kill()
                  executors -= appId + "/" + execId
                }
                sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
                  Some(e.toString), None))
            }
          }
    
    

    manager.start()这是executorRunning里面的start()方法,它里面会去调用executorRunning的fetchAndRunExecutor()方法,所以这才是我们重点需要关注的方法!!!

     private def fetchAndRunExecutor() {
        try {
          // Launch the process
          /**每个 ProcessBuilder 实例管理一个进程属性集。
             它的start() 方法利用这些属性创建一个新的 Process 实例。
             start() 方法可以从同一实例重复调用,以利用相同的或相关的属性创建新的子进程。
          **/
          val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
            memory, sparkHome.getAbsolutePath, substituteVariables)
          val command = builder.command()
          val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
          logInfo(s"Launch command: $formattedCommand")
    
          builder.directory(executorDir)
          builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
          // In case we are running this from within the Spark Shell, avoid creating a "scala"
          // parent process for the executor command
          builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
    
          // Add webUI log urls
          val baseUrl =
            if (conf.getBoolean("spark.ui.reverseProxy", false)) {
              s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
            } else {
              s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
            }
          builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
          builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
    
          /**
            *  这里是真正的启动,通过上面的buildProcessBuilder 方法将属性集放在里面,
            *  现在就可启动coarsegrainedExecutorBackend的main函数了
            *  启动构造器,创建CoarsegrainedExecutorBackend实例,这个是executor运行的容器
            */
          process = builder.start()
          val header = "Spark Executor Command: %s\n%s\n\n".format(
            formattedCommand, "=" * 40)
    
          // 输出CoarsegrainedExecutorBackend实例运行信息
          val stdout = new File(executorDir, "stdout")
          stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
    
          val stderr = new File(executorDir, "stderr")
          Files.write(header, stderr, StandardCharsets.UTF_8)
          stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
    
          // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
          // or with nonzero exit code
          val exitCode = process.waitFor()
          state = ExecutorState.EXITED
          val message = "Command exited with code " + exitCode
          //向wroker发型退出状态请求
          worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
        } 
    

    不知道大家伙在这里会不会感到疑惑,我们之前不是说了在里面会创建一个CoarseGrainedExecutorBackend吗?怎么在这里还是没有见到创建它了语法
    反正我当时完全懵逼,根本找不到在哪里创建,别人和我说都开始调用了,后面经过查找终于知道了原来所有的一切都在comm这个小东西上面,我们还记得在前面是不是创建StandaloneAppClient是在创建后段调度器(StandaloneSchedulerBackend)里面实现的.comm就是它里面的一个属性

        val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
          args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    

    这里我们可以发现它把关于CoarseGrainedExecutorBackend的一些基本信息封装进来了,接下来我们回到方法里面* val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf)拿到一个builder,它jdk里面用来创建系统进程的,我们通过把CoarseGrainedExecutorBackend*的一些基本信息封装进行,就能通过它去new CoarseGrainedExecutorBackend 了
    在CoarseGrainedExecutorBackend 的启动方法里面会发生注册消息给DriverEndPoint

      override def onStart() {
        logInfo("Connecting to driver: " + driverUrl)
        rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
          // This is a very fast action so we can use "ThreadUtils.sameThread"
          driver = Some(ref)
          
          //这里就是向driver进行通信, 把executor的信息送给driver端
          ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
        }(ThreadUtils.sameThread).onComplete {
          // This is a very fast action so we can use "ThreadUtils.sameThread"
          case Success(msg) =>
            // Always receive `true`. Just ignore it
          case Failure(e) =>
            exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
        }(ThreadUtils.sameThread)
      }
    

    ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))向SparkContext端发送了一个ack的响应.通过的DriverEndpointRef发送的,还记得我们之前是在什么地方创建的DriverEndpoint吗,没错!就是在CoarseGrainedSchedulerBackend这个类创建的!
    回到这个方法:

      case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
            //各种条件判断
            if (executorDataMap.contains(executorId)) {
              executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
              context.reply(true)
            } else if (scheduler.nodeBlacklist != null &&
              scheduler.nodeBlacklist.contains(hostname)) {
              // If the cluster manager gives us an executor on a blacklisted node (because it
              // already started allocating those resources before we informed it of our blacklist,
              // or if it ignored our blacklist), then we reject that executor immediately.
              logInfo(s"Rejecting $executorId as it has been blacklisted.")
              executorRef.send(RegisterExecutorFailed(s"Executor is blacklisted: $executorId"))
              context.reply(true)
            } else {
              // If the executor's rpc env is not listening for incoming connections, `hostPort`
              // will be null, and the client connection should be used to contact the executor.
              val executorAddress = if (executorRef.address != null) {
                  executorRef.address
                } else {
                  context.senderAddress
                }
              logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
              addressToExecutorId(executorAddress) = executorId
              totalCoreCount.addAndGet(cores)
              totalRegisteredExecutors.addAndGet(1)
              val data = new ExecutorData(executorRef, executorRef.address, hostname,
                cores, cores, logUrls)
              // This must be synchronized because variables mutated
              // in this block are read when requesting executors
              CoarseGrainedSchedulerBackend.this.synchronized {
                executorDataMap.put(executorId, data)
                if (currentExecutorIdCounter < executorId.toInt) {
                  currentExecutorIdCounter = executorId.toInt
                }
                if (numPendingExecutors > 0) {
                  numPendingExecutors -= 1
                  logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
                }
              }
              //向executor端发送注册成功的消息
              executorRef.send(RegisteredExecutor)
              // Note: some tests expect the reply to come after we put the executor in the map
              context.reply(true)
              //添加一个监听事件,类型于master对于woker的计时器
              listenerBus.post(
                SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
              //开始准备提交任务了!!!!!!!!
              makeOffers()
            }
    

    这个方法会做什么事情呢?

    1. 向executor端返回一个注册成功的消息
    2. 添加一个注册成功的消息
    3. makeOffers()方法里面调用!!!!lun!!!!方法,这个方法太重要了,向Executor发送launchTasks消息执行任务

    executor端收到注册成功的消息后

     case RegisteredExecutor =>
          logInfo("Successfully registered with driver")
          try {
            //实例化executor对象,在spark里面它才是真正的执行任务的人
            executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
          } catch {
            case NonFatal(e) =>
              exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
          }
    
     
    

    主要做的事情:

    1. executor = new Executor(executorId, hostname, env, userClassPath, isLocalnew了一个executor 它才是真正执行任务的代码
    2. 在初始化Executor的时候执行startDriverHeartbeater()方法向Driver发送心跳信息 等待Driver端发送消息
      private def startDriverHeartbeater(): Unit = {
        val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
    
        // Wait a random interval so the heartbeats don't end up in sync
        val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
    
        val heartbeatTask = new Runnable() {
          override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
        }
        //向Driver发送心跳
        heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
      }
    }
    

    我们还记得上面的makeOffers()launchTasks()方法不?哈哈,现在就在executor里面执行launchTask()方法,它里面new TaskRunner(context, taskDescription)的时候处理任务信息,处理完毕后发送StatusUpdate消息给CoarseGrainedExecutorBackend,接着CoarseGrainedExecutorBackend会向DriverEndpoint发送msg消息,代码如下

    override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
        
        //向Driver端发送executor的消息
        val msg = StatusUpdate(executorId, taskId, state, data)
        
        driver match {
          case Some(driverRef) => driverRef.send(msg)
          case None => logWarning(s"Drop $msg because has not yet connected to driver")
        }
      }
    

    在Driver端的回收资源,同时给Executor分配新的任务,这些都是在makeOffer方法里面执行的哦!

    至此,core的通信简单分析完了.
    之后还会有Spark的内存等源码分析哦

    相关文章

      网友评论

        本文标题:spark底层源码解析:

        本文链接:https://www.haomeiwen.com/subject/ptqwrqtx.html