美文网首页
Spark 源码浅析之 Master 和 Worker 部分

Spark 源码浅析之 Master 和 Worker 部分

作者: 越过山丘xyz | 来源:发表于2019-02-07 16:31 被阅读0次

    Master 与 Worker

    在 Standalone 模式下 Spark 使用 Master 来进行资源的分配与管理,Worker 为具体执行任务(提供计算服务)的节点。

    流程概览

    Master 在 SparkCore 的 org.apache.spark.deploy.master 包下,Worker 在 org.apache.spark.deploy.worker 包下。

    Master

    Master 维护着 Application 和 Worker 的信息,当有新的 Application 注册过来的时候为其找到合适的 Worker 并为其在 Worker 上启动 Executor。

    流程概览

    Master 的 main() 方法 (执行启动 Spark 命令):

    private[deploy] object Master extends Logging {
    
      def main(argStrings: Array[String]) {
        Utils.initDaemon(log)
        val conf = new SparkConf
        val args = new MasterArguments(argStrings, conf)
        val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
        rpcEnv.awaitTermination()
      }
    
      def startRpcEnvAndEndpoint(
          host: String,
          port: Int,
          webUiPort: Int,
          conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
       
        // RPC 环境
        val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
        val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
          // 实例化 Master
          new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
        val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
        (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
      }
    }
    

    接下来,我们先看几个 Master 的成员变量,看看 Master 维护着哪些信息:

    // Workers 信息
    val workers = new HashSet[WorkerInfo]
    private val idToWorker = new HashMap[String, WorkerInfo]
    private val addressToWorker = new HashMap[RpcAddress, WorkerInfo]
    
    // Applications 信息
    val apps = new HashSet[ApplicationInfo]
    val idToApp = new HashMap[String, ApplicationInfo]
    private val waitingApps = new ArrayBuffer[ApplicationInfo]
    private val endpointToApp = new HashMap[RpcEndpointRef, ApplicationInfo]
    private val addressToApp = new HashMap[RpcAddress, ApplicationInfo]
    private val completedApps = new ArrayBuffer[ApplicationInfo]
    

    Master 的初始化工作是在 onStart() 方法中完成的:

    override def onStart(): Unit = {
      // Web UI 配置
      webUi = new MasterWebUI(this, webUiPort)
      webUi.bind()
      masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
    
      // 设置定期检查 Worker 是否超时任务
      checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryLogNonFatalError {
          self.send(CheckForWorkerTimeOut)
        }
      }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
    
      // 其它操作
    
      // 启动指标系统
      masterMetricsSystem.registerSource(masterSource)
      masterMetricsSystem.start()
      applicationMetricsSystem.start()
      // 指标系统绑定到 Web UI
      masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
      applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
    
      // 根据模式的不同,选择不同的持久化机制
      val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
          
        // 使用 Zookeeper
        case "ZOOKEEPER" =>
          val zkFactory =
            new ZooKeeperRecoveryModeFactory(conf, serializer)
          (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
    
        // 其它的模式
          
      }
      persistenceEngine = persistenceEngine_
      // leader 选举
      leaderElectionAgent = leaderElectionAgent_
    }
    

    Master 在初始化的时候主要做了两件事,一是启动 Web UI,二是持久化和选举模式的选择。

    当 Worker 启动的时候,会向 Master 发送一条注册信息,然后 Master 将其加入到 Worker 集合中(成员变量 workers),响应 Worker 注册请求的代码可以在 Master.receive() 方法中找到:

    override def receive: PartialFunction[Any, Unit] = {
      // 选举机制
      case ElectedLeader =>
        // 略略略
    
      // Master 主备切换
      // 有兴趣的可以看看
      case CompleteRecovery => completeRecovery()
    
      // Worker 注册
      case RegisterWorker(
        id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
        
        if (state == RecoveryState.STANDBY) {
          workerRef.send(MasterInStandby)
        } else if (idToWorker.contains(id)) {
          workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
        } else {
          // 主要看这里
          // 将 Worker 发送过来的信息进行了封装
          val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
            workerRef, workerWebUiUrl)
          // 注册 Worker
          if (registerWorker(worker)) {
            // 持久化 Worker 信息
            // 比如使用 Zookeepeer
            persistenceEngine.addWorker(worker)
            // 向 Worker 发送一条确认信息
            workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
            // 编制执行计划
            // 因为有新 Worker 的到达,所以可以唤醒一些等待执行的 Application
            // 等会看内部源码
            schedule()
          } else {
              // 略略略...
          }
        }
    
      // Application 注册
      case RegisterApplication(description, driver) =>
        // 等会看
    
      // Executor 状态改变
      case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
        // 略略略...
    
      // Driver 状态改变
      case DriverStateChanged(driverId, state, exception) =>
        // 略略略...
    
      // 心跳机制
      case Heartbeat(workerId, worker) =>
        // 略略略...
    
      // Worker 超时检查
      case CheckForWorkerTimeOut =>
        timeOutDeadWorkers()
    
    }
    

    Master.registerWorker() 的实现细节:

    private def registerWorker(worker: WorkerInfo): Boolean = {
    
      // Worker 集合中可能已经注册过这个 Worker (发生了故障)
      // 将其移出
      workers.filter { w =>
        (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
      }.foreach { w =>
        workers -= w
      }
    
      // Worker RPC Address
      // 补充 addressToWorker 信息 (成员变量)
      val workerAddress = worker.endpoint.address
      if (addressToWorker.contains(workerAddress)) {
        // 略略略
      }
        
      // 将 Worker 放入到 Worker 集合
      workers += worker
      
      // 补充 idToWorker 信息 (成员变量)
      idToWorker(worker.id) = worker
      addressToWorker(workerAddress) = worker
      if (reverseProxy) {
         // Web UI 添加
         webUi.addProxyTargets(worker.id, worker.webUiAddress)
      }
      true
    }
    

    在收到 Worker 发送过来的注册信息后,Master 会将 Worker 的信息添加到自身的成员变量中,并且还会将 Worker 的信息持久化一份 (当 Master 挂了之后可以恢复)。

    接下来我们看看,在 Master 收到 Application 发送过来的注册消息后会做哪些工作,同样可以在 Master.receive() 方法中可以找到:

    override def receive: PartialFunction[Any, Unit] = {
    
      // Application 注册
      case RegisterApplication(description, driver) =>
    
        if (state == RecoveryState.STANDBY) {
          // ignore, don't send response
        } else {
          // 构建 Application
          val app = createApplication(description, driver)
          // 注册 App
          // 主要是添加到等待队列,并记录 App 信息,上文提到的成员变量
          // 与注册 Worker 类似
          registerApplication(app)
          // 持久化
          persistenceEngine.addApplication(app)
          // 向 Driver 发送一条确认信息
          driver.send(RegisteredApplication(app.id, self))
          // 有新的 App 的加入
          // 所以需要重新编排下执行计划
          // 如果等待执行队列为空,那么这个 App 可能就直接被 Master 调度
          schedule()
        }
    
    }
    

    在注册完 Worker 和 Application 后,最后都调用了 Master.schedule() 方法来编排执行计划,接下来我们看看其实现细节:

    private def schedule(): Unit = {
        
      // 对活跃的 Worker 进行随机打散
      // 尽量负载均衡
      val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
      // 活跃的 Worker 数
      val numWorkersAlive = shuffledAliveWorkers.size
      var curPos = 0
        
      // 针对 Driver 的集群启动模式
      for (driver <- waitingDrivers.toList) {
        var launched = false
        var numWorkersVisited = 0
        // 循环遍历活跃的 Worker,直到某个 Worker 满足 Driver 的资源申请
        while (numWorkersVisited < numWorkersAlive && !launched) {
          val worker = shuffledAliveWorkers(curPos)
          numWorkersVisited += 1
          // 当前的 Worker 可以满足 Driver 申请的资源数
          if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
            // 启动 Driver
            launchDriver(worker, driver)
            waitingDrivers -= driver
            launched = true
          }
          curPos = (curPos + 1) % numWorkersAlive
        }
      }
        
      // Application 的 Executor 的分配机制
      startExecutorsOnWorkers()
    }
    

    我这里关注的重点是 Master 如何对 Application 的进行资源分配的,所以我们继续看下 startExecutorsOnWorkers() 的实现细节:

    private def startExecutorsOnWorkers(): Unit = {
    
      // Master 使用先进先出的调度方式
      for (app <- waitingApps if app.coresLeft > 0) {
        // App 需要每个 Executor CPU 的核数
        // 在提交任务的时候由我们指定的
        val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
        // 在活跃的 Workers 中找出满足 App 资源需求的 Worker
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
            worker.coresFree >= coresPerExecutor.getOrElse(1))
          .sortBy(_.coresFree).reverse
          
        // scheduleExecutorsOnWorkers() 这个方法有点复杂
        // 计算在哪些 Worker 上启动 Executor
        // 返回可执行的 Worker 数组及内核数
        // 内部有两种分配方式:
        //   1. 多启动 Worker,让 Worker 均摊提供给 Executor 的 Core.
        //   2. 少启动 Worker,让 Worker 提供最大的 Core 给 Executor(最大的 Core 为
        //      每个 Executor 需要的Core).
        // 有兴趣的可以自己看下
        val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
    
        // 对 Worker 进行资源分配
        for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
          allocateWorkerResourceToExecutors(
            app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
        }
      }
    }
    

    我们还是看点简单的,看看 allocateWorkerResourceToExecutors() 的内部实现细节:

    private def allocateWorkerResourceToExecutors(
        app: ApplicationInfo,
        assignedCores: Int,
        coresPerExecutor: Option[Int],
        worker: WorkerInfo): Unit = {
      
      // 在这个 Worker 上需要启动的 Executor 数
      val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
      // 可分配的 Core 数量
      val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
      for (i <- 1 to numExecutors) {
        // Executor 信息
        val exec = app.addExecutor(worker, coresToAssign)
        // 在 Worker 上启动 Executor
        launchExecutor(worker, exec)
        app.state = ApplicationState.RUNNING
      }
    }
    

    launchExecutor() 的实现细节:

    private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
      worker.addExecutor(exec)
      // 向 Worker 发送启动 Executor 的消息
      worker.endpoint.send(LaunchExecutor(masterUrl,
        exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
      // 向 Driver 发送分配完成消息
      // Executor 在启动完成后会主动联系 Driver
      exec.application.driver.send(
        ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
    }
    

    简单的总结一下,Master 会维护着 Application 和 Worker 的信息,当有新的 Application 过来注册时,Master 会从 Worker 集合中找到合适的 Worker 并为其分配并启动 Executor。

    Worker

    Worker 是具体提供计算节点,当收到 Master 的启动 Executor 启动后,就会启动 Executor 进程。

    流程概览

    我们还是先看看 Worker 维护着哪些信息:

    private val workerId = generateWorkerId()
    val drivers = new HashMap[String, DriverRunner]
    val executors = new HashMap[String, ExecutorRunner]
    val finishedDrivers = new LinkedHashMap[String, DriverRunner]
    val appDirectories = new HashMap[String, Seq[String]]
    val finishedApps = new HashSet[String]
    

    接下来我们从 Worker 的 onStart() 方法入手,看看 Worker 做了哪些工作:

    override def onStart() {
      // 创建工作目录
      createWorkDir()
      // Web UI
      shuffleService.startIfEnabled()
      webUi = new WorkerWebUI(this, workDir, webUiPort)
      webUi.bind()
    
      workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
        
      // 向 Master 进行注册
      // 内部实现会调用 tryRegisterAllMasters(),与 SchedulerBackend 有点像
      registerWithMaster()
    
      // 指标信息绑定界面
      metricsSystem.registerSource(workerSource)
      metricsSystem.start()
      metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
    }
    

    接下来我们看看,当 Worker 收到 Master 的启动 Executor 请求后会做哪些工作,可以在 Worker.receive() 方法中找到:

    override def receive: PartialFunction[Any, Unit] = synchronized {
      case msg: RegisterWorkerResponse =>
        handleRegisterResponse(msg)
    
      // 发送心跳
      case SendHeartbeat =>
        if (connected) { sendToMaster(Heartbeat(workerId, self)) }
    
      // 其它的都忽略
    
      // 启动 Executor
      case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
        if (masterUrl != activeMasterUrl) {
          logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
        } else {
          try {
            // 创建 Executor 工作目录
            val executorDir = new File(workDir, appId + "/" + execId)
            if (!executorDir.mkdirs()) {
              throw new IOException("Failed to create directory " + executorDir)
            }
    
            // 创建本地工作目录
            val appLocalDirs = appDirectories.getOrElse(...)
            
            // 创建一个 ExecutorRunner 负责管理
            val manager = new ExecutorRunner(...)
            
            // 启动 Executor
            // 内部使用 ProcessBuilder 去启动 Executor (通过启动命令)
            manager.start()
            
            // 更新自身的资源使用情况
            coresUsed += cores_
            memoryUsed += memory_
            
            // 给 Master 一条确认信息
            sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
          } catch {
            // 略略略...
          }
        }
    
    
      // 这个是针对集群模式 Driver 的
      case LaunchDriver(driverId, driverDesc) =>
     
        val driver = new DriverRunner(...)
        // 启动 Driver
        // 与 Executor 类似,内部使用 ProcessBuilder 去启动
        driver.start()
    
        coresUsed += driverDesc.cores
        memoryUsed += driverDesc.mem
    
    }
    

    ExecutorRunner.start() 的实现细节:

    private[worker] def start() {
      // 启动一个线程去处理
      workerThread = new Thread("ExecutorRunner for " + fullId) {
        override def run() { fetchAndRunExecutor() }
      }
      workerThread.start()
    
      // 略略略
    }
    

    fetchAndRunExecutor() 的实现细节:

    // 下载并运行 Executor
    private def fetchAndRunExecutor() {
      try {
        // 启动一个进程
        val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
          memory, sparkHome.getAbsolutePath, substituteVariables)
        
        // 略略略
        
        // 启动进程
        process = builder.start()
    
        // 略略略
          
      } catch {
        // 略略略
      }
    }
    

    ProcessBuilder.start() 主要就是通过命令启动一个新的进程,内部实现太复杂了,这里就不看了。

    简单的总结一下,我们只看了很少的 Worker 源码,也就是接收到 Master 的启动 Executor 请求,然后启动 Executor (新进程)。

    相关文章

      网友评论

          本文标题:Spark 源码浅析之 Master 和 Worker 部分

          本文链接:https://www.haomeiwen.com/subject/iwqgsqtx.html