美文网首页
原创-Spark源码分析二:Standalone模式下Maste

原创-Spark源码分析二:Standalone模式下Maste

作者: 无色的叶 | 来源:发表于2018-11-01 17:15 被阅读0次

    接着上篇分析《https://www.jianshu.com/p/c9aa62460e43
    在Master选举为leader后发送ElectedLeader消息,匹配recive方法中的ElectedLeader消息处理

    case ElectedLeader =>
          //获取持久化的app、driver、worker信息
          val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
          //如都为空,则master状态为ALIVE,否则为RECOVERING
          state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
            RecoveryState.ALIVE
          } else {
            RecoveryState.RECOVERING
          }
          logInfo("I have been elected leader! New state: " + state)
          if (state == RecoveryState.RECOVERING) {
            //恢复master状态
            beginRecovery(storedApps, storedDrivers, storedWorkers)
            recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
              override def run(): Unit = Utils.tryLogNonFatalError {
                self.send(CompleteRecovery)
              }
            }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
          }
    
    

    如master状态为RECOVERING则进一步调用beginRecovery()方法恢复master状态

    private def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo],
                                storedWorkers: Seq[WorkerInfo]) {
        for (app <- storedApps) {
          logInfo("Trying to recover app: " + app.id)
          try {
            //重新注册APP
            registerApplication(app)
            app.state = ApplicationState.UNKNOWN
            //通知driver  master节点改变了
            app.driver.send(MasterChanged(self, masterWebUiUrl))
          } catch {
            case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
          }
        }
    
        for (driver <- storedDrivers) {
          // Here we just read in the list of drivers. Any drivers associated with now-lost workers
          // will be re-launched when we detect that the worker is missing.
          drivers += driver
        }
    
        for (worker <- storedWorkers) {
          logInfo("Trying to recover worker: " + worker.id)
          try {
            //重新注册work
            registerWorker(worker)
            worker.state = WorkerState.UNKNOWN
            //通知work更新master
            worker.endpoint.send(MasterChanged(self, masterWebUiUrl))
          } catch {
            case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
          }
        }
      }
    

    在master节点主备节点切换时,会触发该方法,且在RECOVERING状态的master节点不能处理接受任何新提交的任务,再回到ElectedLeader 消息的处理在执行完beginRecovery方法后,紧接着会向自身发送CompleteRecovery消息

     beginRecovery(storedApps, storedDrivers, storedWorkers)
            recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
              override def run(): Unit = Utils.tryLogNonFatalError {
                self.send(CompleteRecovery)
              }
            }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
    

    接着查看CompleteRecovery消息处理,调用completeRecovery()方法

    case CompleteRecovery => completeRecovery()
    
    private def completeRecovery() {
        // Ensure "only-once" recovery semantics using a short synchronization period.
        //确保只recovery一次
        if (state != RecoveryState.RECOVERING) {
          return
        }
        state = RecoveryState.COMPLETING_RECOVERY
    
        // Kill off any workers and apps that didn't respond to us.
        //移除未知状态的work和app
        workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
        apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
    
        // Update the state of recovered apps to RUNNING
        apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = ApplicationState.RUNNING)
    
        // Reschedule drivers which were not claimed by any workers
        drivers.filter(_.worker.isEmpty).foreach { d =>
          logWarning(s"Driver ${d.id} was not found after master recovery")
          if (d.desc.supervise) {
            logWarning(s"Re-launching ${d.id}")
            relaunchDriver(d)
          } else {
            removeDriver(d.id, DriverState.ERROR, None)
            logWarning(s"Did not re-launch ${d.id} because it was not supervised")
          }
        }
    
        state = RecoveryState.ALIVE
        //开始调度执行未执行的任务
        schedule()
        logInfo("Recovery complete - resuming operations!")
      }
    

    后续接着分析,笔者水平有限,如有误欢迎指正

    相关文章

      网友评论

          本文标题:原创-Spark源码分析二:Standalone模式下Maste

          本文链接:https://www.haomeiwen.com/subject/bfbxxqtx.html