美文网首页
原创-Spark源码分析一:Standalone模式下Maste

原创-Spark源码分析一:Standalone模式下Maste

作者: 无色的叶 | 来源:发表于2018-11-01 16:43 被阅读0次

    一:概述

    •   Master节点是Spark Standalone运行模式下的主节点,主要用于管理集群,负责资源的调度,其继承了ThreadSafeRpcEndpoint 、LeaderElectable两个类。
    •  ThreadSafeRpcEndpoint 类功能:线程安全的RpcEndpoint,可理解对消息有序处理,启动时默认先执行onStart方法,由receive和receiveAndReply方法处理接收到的所有消息,区别是一个无返回值,一个有返回值
    •  LeaderElectable类功能:实现master主节点选举
    •   代码片段如下:
    private[deploy] class Master(  
        override val rpcEnv: RpcEnv,
        address: RpcAddress,
        webUiPort: Int,
        val securityMgr: SecurityManager,
        val conf: SparkConf)
      extends ThreadSafeRpcEndpoint with Logging with LeaderElectable {
    

    其主要变量:

    //周期检测work节点状态守护线程
      private val forwardMessageThread =
        ThreadUtils.newDaemonSingleThreadScheduledExecutor("master-forward-message-thread")
      //保存注册的work节点信息
      val workers = new HashSet[WorkerInfo]
    //保存workId和work对应关系
      private val idToWorker = new HashMap[String, WorkerInfo]
     //master节点状态
      private var state = RecoveryState.STANDBY
      //持久化对象
      private var persistenceEngine: PersistenceEngine = _
      //zookeeper高可用选举实现
      private var leaderElectionAgent: LeaderElectionAgent = _
     // Drivers currently spooled for scheduling
      //保存提交的、待执行任务
      private val waitingDrivers = new ArrayBuffer[DriverInfo]
    

    二:Master启动过程

    • 启动一个Master是通过Shell命令启动了一个脚本start-master.sh开始的,这个脚本的启动流程如下

        start-master.sh  -> spark-daemon.sh start org.apache.spark.deploy.master.Master
      
    • 执行main方法,启动masterEndpoint消息服务,在启动服务过程中首先会执行onStart方法,进行一些必要的初始化动作

    override def onStart(): Unit = {
        .....................
        省略部分,后面对关键实现有详解
       ...........................
     }
    

      在onstart方法中有几个重要的步骤:
    1、周期性检查work节点状态

          checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(CheckForWorkerTimeOut)
          }
        }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
    

      如代码片段所示,每隔WORKER_TIMEOUT_MS秒(默认60*1000)给自身发送CheckForWorkerTimeOut消息检测work节点存活状态,紧接着消息会匹配到receive方法中的CheckForWorkerTimeOut 消息

    case CheckForWorkerTimeOut =>
          timeOutDeadWorkers()
    

    在timeOutDeadWorkers()方法中真正去检测处理work节点

    private def timeOutDeadWorkers() {
        /** Check for, and remove, any timed-out workers */
        // Copy the workers into an array so we don't modify the hashset while iterating through it
        val currentTime = System.currentTimeMillis()
        //过滤心跳时间超过一分钟的work节点
        val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
        for (worker <- toRemove) {
          //判断work节点的状态是否为DEAD
          if (worker.state != WorkerState.DEAD) {
            logWarning("Removing %s because we got no heartbeat in %d seconds".format(
              worker.id, WORKER_TIMEOUT_MS / 1000))
            //移除work节点
            removeWorker(worker)
          } else {
            //work节点状态处于DEAD,但works集合中还存在,则移除
            if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
              workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
            }
          }
        }
      }
    

    接着进入removeWorker方法

     private def removeWorker(worker: WorkerInfo) {
        logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
        //设置work状态为DEAD
        worker.setState(WorkerState.DEAD)
        //从workId和work映射集合中移除work
        idToWorker -= worker.id
        addressToWorker -= worker.endpoint.address
        if (reverseProxy) {
          webUi.removeProxyTargets(worker.id)
        }
        //更新移除work节点上executor状态
        for (exec <- worker.executors.values) {
          logInfo("Telling app of lost executor: " + exec.id)
          exec.application.driver.send(ExecutorUpdated(
            exec.id, ExecutorState.LOST, Some("worker lost"), None, workerLost = true))
          exec.state = ExecutorState.LOST
          exec.application.removeExecutor(exec)
        }
        //移除work节点上的driver,同时对未执行的driver重新分配work节点进行执行
        for (driver <- worker.drivers.values) {
          if (driver.desc.supervise) {
            logInfo(s"Re-launching ${driver.id}")
           //重新执行driver
            relaunchDriver(driver)
          } else {
            logInfo(s"Not re-launching ${driver.id} because it was not supervised")
            removeDriver(driver.id, DriverState.ERROR, None)
          }
        }
        //移除持久化的work节点信息,如删除zookeeper上work节点信息
        persistenceEngine.removeWorker(worker)
      }
    

    2、根据spark.deploy.recoveryMode属性配置的高可用模式,创建对应实现

        /**
          * 1、根据spark.deploy.recoveryMode属性,配置高可用模式,匹配不同的模式,创建不同实现
          * 2、创建ZooKeeperLeaderElectionAgent对象,进行master节点选举
          */
        val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
          case "ZOOKEEPER" =>
            logInfo("Persisting recovery state to ZooKeeper")
            val zkFactory =
              new ZooKeeperRecoveryModeFactory(conf, serializer)
            (zkFactory.createPersistenceEngine(), zkFactory.createLeaderElectionAgent(this))
          case "FILESYSTEM" =>
            val fsFactory =
              new FileSystemRecoveryModeFactory(conf, serializer)
            (fsFactory.createPersistenceEngine(), fsFactory.createLeaderElectionAgent(this))
          case "CUSTOM" =>
            val clazz = Utils.classForName(conf.get("spark.deploy.recoveryMode.factory"))
            val factory = clazz.getConstructor(classOf[SparkConf], classOf[Serializer])
              .newInstance(conf, serializer)
              .asInstanceOf[StandaloneRecoveryModeFactory]
            (factory.createPersistenceEngine(), factory.createLeaderElectionAgent(this))
          case _ =>
            (new BlackHolePersistenceEngine(), new MonarchyLeaderAgent(this))
        }
        persistenceEngine = persistenceEngine_
        leaderElectionAgent = leaderElectionAgent_
      }
    
    

    以zookeeper作高可用为例:
       由ZooKeeperRecoveryModeFactory创建ZooKeeperPersistenceEngine对应操作zookeeper的实现,如addWorker、removeWorker、readPersistedData等方法
    3、创建LeaderElectionAgent对象,进行master主节点选举
    以zookeeper作高可用为例:
       由ZooKeeperRecoveryModeFactory创建ZooKeeperLeaderElectionAgent对象,默认调用该类中的start方法开始进行master主节点选举

    ....略.....
      start()
      private def start() {
        logInfo("Starting ZooKeeper LeaderElection agent")
        zk = SparkCuratorUtil.newClient(conf)
        leaderLatch = new LeaderLatch(zk, WORKING_DIR)
        leaderLatch.addListener(this)
        leaderLatch.start()
      }
    

    如被选举成leader节点会调用isLeader()方法,如和zookeeper连接断开丢失leader,会调用notLeader()方法

    override def isLeader() {
        synchronized {
          // could have lost leadership by now.
          if (!leaderLatch.hasLeadership) {
            return
          }
    
          logInfo("We have gained leadership")
          updateLeadershipStatus(true)
        }
      }
    
      override def notLeader() {
        synchronized {
          // could have gained leadership by now.
          if (leaderLatch.hasLeadership) {
            return
          }
    
          logInfo("We have lost leadership")
          updateLeadershipStatus(false)
        }
      }
    

    如上代码片段可看到都会进一步调用updateLeadershipStatus()方法

    private def updateLeadershipStatus(isLeader: Boolean) {
        if (isLeader && status == LeadershipStatus.NOT_LEADER) {
          status = LeadershipStatus.LEADER
    // 如是leader则调用Master中的electedLeader
          masterInstance.electedLeader()
        } else if (!isLeader && status == LeadershipStatus.LEADER) {
          status = LeadershipStatus.NOT_LEADER
    // 如不是leader则调用Master中的revokedLeadership
          masterInstance.revokedLeadership()
        }
      }
    

    接着查看Master类中的revokedLeadership()方法,即给自身发送了RevokedLeadership消息,

    override def revokedLeadership() {
        self.send(RevokedLeadership)
      }
    

    而对RevokedLeadership消息处理比较简单,即直接退出

    case RevokedLeadership =>
          logError("Leadership has been revoked -- master shutting down.")
          System.exit(0)
    

    同样查看Master类中的electedLeader()方法,也是给自身发送了ElectedLeader消息

    override def electedLeader() {
        self.send(ElectedLeader)
      }
    

    后续接着分析,笔者水平有限,如有误欢迎指正

    相关文章

      网友评论

          本文标题:原创-Spark源码分析一:Standalone模式下Maste

          本文链接:https://www.haomeiwen.com/subject/zufxxqtx.html