美文网首页
原创-Spark源码分析三:Standalone模式下Work启

原创-Spark源码分析三:Standalone模式下Work启

作者: 无色的叶 | 来源:发表于2018-11-02 10:27 被阅读0次

一:概述

  •   Work节点是Spark Standalone运行模式下真正执行任务的节点

二:Work启动过程

  • 启动一个Work是通过Shell命令启动了一个脚本start-slave.sh开始的,这个脚本的启动流程如下

     start-slave.sh  -> org.apache.spark.deploy.worker.Worker
    
  • 执行main方法,启动workEndpoint消息服务,在启动服务过程中首先会执行onStart方法,进行一些必要的初始化动作

override def onStart() {
    assert(!registered)
    logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
      host, port, cores, Utils.megabytesToString(memory)))
    logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
    logInfo("Spark home: " + sparkHome)
    createWorkDir()
    //启动shuffle服务
    shuffleService.startIfEnabled()
    webUi = new WorkerWebUI(this, workDir, webUiPort)
    webUi.bind()

    workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
    //向master节点进行注册
    registerWithMaster()

    metricsSystem.registerSource(workerSource)
    metricsSystem.start()
    // Attach the worker metrics servlet handler to the web ui after the metrics system is started.
    metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
  }

其中执行 registerWithMaster()方法向master节点进行注册

private def registerWithMaster() {
    // onDisconnected may be triggered multiple times, so don't attempt registration
    // if there are outstanding registration attempts scheduled.
    registrationRetryTimer match {
      case None =>
        registered = false
        registerMasterFutures = tryRegisterAllMasters()
        connectionAttemptCount = 0
        registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
          new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              // 向自身发送注册ReregisterWithMaster消息
              Option(self).foreach(_.send(ReregisterWithMaster))
            }
          },
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          TimeUnit.SECONDS))
      case Some(_) =>
        logInfo("Not spawning another attempt to register with the master, since there is an" +
          " attempt scheduled already.")
    }
  }

向自身发送ReregisterWithMaster消息,匹配recive方法中的ReregisterWithMaster消息处理

case ReregisterWithMaster =>
      reregisterWithMaster()

接着进入reregisterWithMaster()方法

/**
    * Re-register with the master because a network failure or a master failure has occurred.
    * If the re-registration attempt threshold is exceeded, the worker exits with error.
    * Note that for thread-safety this should only be called from the rpcEndpoint.
    */
  private def reregisterWithMaster(): Unit = {
    Utils.tryOrExit {
      // 尝试注册次数,最大尝试注册次数16次
      connectionAttemptCount += 1
      if (registered) {
        // 如已注册,取消注册尝试
        cancelLastRegistrationRetry()
      } else if (connectionAttemptCount <= TOTAL_REGISTRATION_RETRIES) {
        logInfo(s"Retrying connection to master (attempt # $connectionAttemptCount)")

        /**
          * Re-register with the active master this worker has been communicating with. If there
          * is none, then it means this worker is still bootstrapping and hasn't established a
          * connection with a master yet, in which case we should re-register with all masters.
          *
          * It is important to re-register only with the active master during failures. Otherwise,
          * if the worker unconditionally attempts to re-register with all masters, the following
          * race condition may arise and cause a "duplicate worker" error detailed in SPARK-4592:
          *
          * (1) Master A fails and Worker attempts to reconnect to all masters
          * (2) Master B takes over and notifies Worker
          * (3) Worker responds by registering with Master B
          * (4) Meanwhile, Worker's previous reconnection attempt reaches Master B,
          * causing the same Worker to register with Master B twice
          *
          * Instead, if we only register with the known active master, we can assume that the
          * old master must have died because another master has taken over. Note that this is
          * still not safe if the old master recovers within this interval, but this is a much
          * less likely scenario.
          */
        master match {
          case Some(masterRef) =>
            // registered == false && master != None means we lost the connection to master, so
            // masterRef cannot be used and we need to recreate it again. Note: we must not set
            // master to None due to the above comments.
            if (registerMasterFutures != null) {
              registerMasterFutures.foreach(_.cancel(true))
            }
            val masterAddress =
              if (preferConfiguredMasterAddress) masterAddressToConnect.get else masterRef.address
            registerMasterFutures = Array(registerMasterThreadPool.submit(new Runnable {
              override def run(): Unit = {
                try {
                  logInfo("Connecting to master " + masterAddress + "...")
                  val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
                  sendRegisterMessageToMaster(masterEndpoint)
                } catch {
                  case ie: InterruptedException => // Cancelled
                  case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
                }
              }
            }))
          case None =>
            if (registerMasterFutures != null) {
              registerMasterFutures.foreach(_.cancel(true))
            }
            // We are retrying the initial registration
            registerMasterFutures = tryRegisterAllMasters()
        }
        // We have exceeded the initial registration retry threshold
        // All retries from now on should use a higher interval
        if (connectionAttemptCount == INITIAL_REGISTRATION_RETRIES) {
          registrationRetryTimer.foreach(_.cancel(true))
          registrationRetryTimer = Some(
            forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
              override def run(): Unit = Utils.tryLogNonFatalError {
                self.send(ReregisterWithMaster)
              }
            }, PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
              PROLONGED_REGISTRATION_RETRY_INTERVAL_SECONDS,
              TimeUnit.SECONDS))
        }
      } else {
        logError("All masters are unresponsive! Giving up.")
        System.exit(1)
      }
    }
  }
  • 向Master节点发送RegisterWorker注册消息,如果尝试注册次数达到INITIAL_REGISTRATION_RETRIES定义的6次,则重新向自身发送ReregisterWithMaster消息进行注册,主要是修改注册时间间隔
  • Master节点收到RegisterWorker消息进行处理
 case RegisterWorker(
    id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      if (state == RecoveryState.STANDBY) {
        // Master处于Standby状态,不作任何处理,向work节点发送MasterInStandby消息
        workerRef.send(MasterInStandby)
      } else if (idToWorker.contains(id)) {
        // work已注册过,返回注册失败消息
        workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
      } else {
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          workerRef, workerWebUiUrl)
        //
        if (registerWorker(worker)) {
          // 持久化work节点信息
          persistenceEngine.addWorker(worker)
          // 向work节点发送RegisteredWorker消息
          workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
          )
        } else {
          val workerAddress = worker.endpoint.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress))
        }
      }

Master注册work信息后,向Work节点发送RegisteredWorker消息

msg match {
      case RegisteredWorker(masterRef, masterWebUiUrl, masterAddress) =>
        if (preferConfiguredMasterAddress) {
          logInfo("Successfully registered with master " + masterAddress.toSparkURL)
        } else {
          logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
        }
        registered = true
        changeMaster(masterRef, masterWebUiUrl, masterAddress) // 修改当前master信息
        forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            // 向Master节点发送心跳
            self.send(SendHeartbeat)
          }
        }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
        if (CLEANUP_ENABLED) {
          logInfo(
            s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
          forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              self.send(WorkDirCleanup)
            }
          }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
        }

        val execs = executors.values.map { e =>
          new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
        }
        masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))

至此整体work节点启动注册流程结束

相关文章

网友评论

      本文标题:原创-Spark源码分析三:Standalone模式下Work启

      本文链接:https://www.haomeiwen.com/subject/gorqxqtx.html