美文网首页Spark源码解读
Spark中Master源码分析(二)

Spark中Master源码分析(二)

作者: lehi | 来源:发表于2016-03-22 00:34 被阅读338次

    继续上一篇的内容。上一篇的内容为:

    Spark中Master源码分析(一) http://www.jianshu.com/p/817a7069d058

    4.receive方法,receive方法中消息类型主要分为以下12种情况:
    (1)重新选择了新Leader,进行数据的恢复
    (2)恢复完毕,重新创建Driver,完成资源的重新分配
    (3)触发Leadership的选举
    (4)Master注册新的Worker
    (5)Master注册新的App,然后重新分配资源
    (6)Executor转态发生改变,比如正在运行,执行完毕后会发生的情况
    (7)Driver转态发生变化,进行相应的操作
    (8)心跳机制,通过该机制master和worker保持联系
    (9)master对于app的状态的处理
    (10)worker调度状态改变响应
    (11)没有注册的app将认为已经完成了并移除
    (12)通过worker是否超时,从而判断worker是否dead

    12种情况详细代码如下所示:
    (1)重新选择了新Leader,进行数据的恢复
    <code>
    case ElectedLeader => {
    val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData(rpcEnv)
    state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) {
    RecoveryState.ALIVE
    } else {
    RecoveryState.RECOVERING
    }
    logInfo("I have been elected leader! New state: " + state)
    if (state == RecoveryState.RECOVERING) {
    //恢复数据中
    beginRecovery(storedApps, storedDrivers, storedWorkers)
    //守护单线程1s后发送一个完成恢复的请求,并异步等待响应
    recoveryCompletionTask = forwardMessageThread.schedule(new Runnable {
    override def run(): Unit = Utils.tryLogNonFatalError {
    self.send(CompleteRecovery)
    }
    }, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
    }
    }
    </code>
    (2)恢复完毕,重新创建Driver,完成资源的重新分配
    <code>
    case CompleteRecovery => completeRecovery()详见下①
    </code>
    ①completeRecovery方法如下:
    <code>
    private def completeRecovery() {
    if (state != RecoveryState.RECOVERING) { return }
    state = RecoveryState.COMPLETING_RECOVERY
    //kill所有的不响应的workers和apps
    workers.filter(.state == WorkerState.UNKNOWN).foreach(removeWorker)
    apps.filter(
    .state == ApplicationState.UNKNOWN).foreach(finishApplication)
    // 重新创建Driver
    drivers.filter(_.worker.isEmpty).foreach { d =>
    logWarning(s"Driver ${d.id} was not found after master recovery")
    if (d.desc.supervise) {
    logWarning(s"Re-launching ${d.id}")
    relaunchDriver(d)详见下②
    } else {
    removeDriver(d.id, DriverState.ERROR, None)
    logWarning(s"Did not re-launch ${d.id} because it was not supervised")
    }
    }
    </code>
    ②relaunchDriver方法如下,将Driver的转态为RELAUNCHING,添加到即将创建的Driver列表中,然后重新分配资源
    <code>
    private def relaunchDriver(driver: DriverInfo) {
    driver.worker = None
    driver.state = DriverState.RELAUNCHING
    waitingDrivers += driver
    //重新分配资源,详见下③
    schedule()
    }
    </code>
    ③schedule的方法如下,该方法主要为等待执行的apps安排可用的资源,每当一个新的app提交或可用资源(worker等)发生变化时调用
    <code>
    private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) { return }
    // Drivers优先于executors
    // 通过Random.shuffle返回一个新的乱序排序的workers集合
    val shuffledWorkers = Random.shuffle(workers)
    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
    for (driver <- waitingDrivers) {
    if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
    //根据worker和driver信息创建worker,详见下④
    launchDriver(worker, driver)
    waitingDrivers -= driver
    }
    }
    }
    //调用和创建workers上的executors
    startExecutorsOnWorkers()
    }
    </code>
    ④ launchDriver方法如下,根据worker和driver信息创建worker
    <code>
    private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    //将worker的资源分配给driver
    worker.addDriver(driver)
    driver.worker = Some(worker)
    //worker将启动driver
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    //将driver的状态置位RUNNING
    driver.state = DriverState.RUNNING
    }
    </code>
    (3)触发Leadership的选举
    <code>
    case RevokedLeadership => {
    logError("Leadership has been revoked -- master shutting down.")
    System.exit(0)
    }
    </code>
    (4)Master注册新的Worker,然后重新分配资源
    <code>
    case RegisterWorker(
    id, workerHost, workerPort, workerRef, cores, memory, workerUiPort, publicAddress) => {
    logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
    workerHost, workerPort, cores, Utils.megabytesToString(memory)))
    if (state == RecoveryState.STANDBY) {
    } else if (idToWorker.contains(id)) {
    //通知worker注册失效,并退出
    workerRef.send(RegisterWorkerFailed("Duplicate worker ID"))
    } else {
    val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
    workerRef, workerUiPort, publicAddress)
    if (registerWorker(worker)) {
    //将新添加的worker信息持久化
    persistenceEngine.addWorker(worker)
    //worker发送RegisteredWorker消息,并开始向master发送心跳
    workerRef.send(RegisteredWorker(self, masterWebUiUrl))
    //重新分配资源
    schedule()
    } else {
    val workerAddress = worker.endpoint.address
    logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress)workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress))
    }
    }
    }
    </code>
    (5)Master注册新的App,然后重新分配资源
    <code>
    case RegisterApplication(description, driver) => {
    if (state == RecoveryState.STANDBY) {
    } else {
    logInfo("Registering app " + description.name)
    //根据appdescription和driver创建app
    val app = createApplication(description, driver),详见下①
    //注册app
    registerApplication(app),详见下②
    logInfo("Registered app " + description.name + " with ID " + app.id)
    //将app持久化
    persistenceEngine.addApplication(app)
    //driver将给AppClient发送RegisteredApplication消息
    driver.send(RegisteredApplication(app.id, self))
    //重新分配资源
    schedule()
    }
    }
    </code>
    ①createApplication方法如下,根据appdescription和driver创建app
    <code>
    private def createApplication(desc: ApplicationDescription, driver: RpcEndpointRef):
    ApplicationInfo = {
    val now = System.currentTimeMillis()
    val date = new Date(now)
    //用App的主构造器创建一个App
    new ApplicationInfo(now, newApplicationId(date), desc, date, driver, defaultCores)
    }
    </code>
    ②registerApplication方法如下:
    <code>
    private def registerApplication(app: ApplicationInfo): Unit = {
    val appAddress = app.driver.address
    if (addressToApp.contains(appAddress)) {
    logInfo("Attempted to re-register application at same address: " + appAddress)
    return
    }
    //将app的源信息,比如状态、运行时间、核数注册到metrics系统中
    applicationMetricsSystem.registerSource(app.appSource)
    apps += app
    idToApp(app.id) = app
    endpointToApp(app.driver) = app
    addressToApp(appAddress) = app
    waitingApps += app
    }
    </code>

    (6)Executor转态发生改变,比如正在运行,执行完毕后会发生的情况
    <code>
    case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
    val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
    execOption match {
    case Some(exec) => {
    val appInfo = idToApp(appId)
    exec.state = state
    //如果executor正在执行任务,将retry次数置位0
    if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() } //给appClient发送ExecutorUpdated消息
    exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus))
    //如果Executor执行完了,移除worker和app上的executor
    if (ExecutorState.isFinished(state)) {
    logInfo(s"Removing executor ${exec.fullId} because it is $state")
    //如果一个app已经执行完了,将它的信息反馈在Web UI上
    if (!appInfo.isFinished) {
    appInfo.removeExecutor(exec)
    }
    exec.worker.removeExecutor(exec)
    val normalExit = exitStatus == Some(0)
    // 只要retry次数小于10,那么executor的资源就会不断的调整
    if (!normalExit) {
    if (appInfo.incrementRetryCount() < ApplicationState.MAX_NUM_RETRY) {
    //调整资源
    schedule()
    } else {
    val execs = appInfo.executors.values
    if (!execs.exists(_.state == ExecutorState.RUNNING)) {
    logError(s"Application ${appInfo.desc.name} with ID ${appInfo.id} failed " +
    s"${appInfo.retryCount} times; removing it")
    removeApplication(appInfo, ApplicationState.FAILED)
    }
    }
    }
    }
    }
    case None =>
    logWarning(s"Got status update for unknown executor $appId/$execId")
    }
    }
    </code>

    (7)Driver转态发生变化,进行相应的操作
    <code>
    case DriverStateChanged(driverId, state, exception) => {
    state match {
    case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
    removeDriver(driverId, state, exception)
    case _ =>
    throw new Exception(s"Received unexpected state update for driver $driverId: $state")
    }
    }
    </code>

    (8)心跳机制,通过该机制master和worker保持联系
    <code>
    case Heartbeat(workerId, worker) => {
    idToWorker.get(workerId) match {
    case Some(workerInfo) =>
    //更新worker的最后一次心跳时间
    workerInfo.lastHeartbeat = System.currentTimeMillis()
    case None =>
    if (workers.map(_.id).contains(workerId)) {
    logWarning(s"Got heartbeat from unregistered worker $workerId." +
    " Asking it to re-register.")
    worker.send(ReconnectWorker(masterUrl))
    } else {
    logWarning(s"Got heartbeat from unregistered worker $workerId." +
    " This worker was never registered, so ignoring the heartbeat.")
    }
    }
    }
    </code>
    (9)master对于app的状态的处理
    <code>
    case MasterChangeAcknowledged(appId) => {
    idToApp.get(appId) match {
    case Some(app) =>
    logInfo("Application has been re-registered: " + appId)
    app.state = ApplicationState.WAITING
    case None =>
    logWarning("Master change ack from unknown app: " + appId)
    }
    if (canCompleteRecovery) { completeRecovery() }
    }
    </code>

    (10)worker调度状态改变响应
    <code>
    case WorkerSchedulerStateResponse(workerId, executors, driverIds) => {
    idToWorker.get(workerId) match {
    case Some(worker) =>
    logInfo("Worker has been re-registered: " + workerId)
    worker.state = WorkerState.ALIVE
    val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
    for (exec <- validExecutors) {
    val app = idToApp.get(exec.appId).get
    val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
    worker.addExecutor(execInfo)
    execInfo.copyState(exec)
    }
    for (driverId <- driverIds) {
    drivers.find(_.id == driverId).foreach { driver =>
    driver.worker = Some(worker)
    driver.state = DriverState.RUNNING
    worker.drivers(driverId) = driver
    }
    }
    case None =>
    logWarning("Scheduler state from unknown worker: " + workerId)
    }
    if (canCompleteRecovery) { completeRecovery() }
    }
    </code>
    (11)没有注册的app将认为已经完成了并移除
    <code>
    case UnregisterApplication(applicationId) =>
    logInfo(s"Received unregister request from application $applicationId")
    idToApp.get(applicationId).foreach(finishApplication)
    </code>
    (12)通过worker是否超时,从而判断worker是否dead
    <code>
    case CheckForWorkerTimeOut => {
    //移除Dead worker,如果系统当前时间-Worker超时(1min)>worker最后心跳时间,判断worker为dead并移除
    timeOutDeadWorkers()
    }
    </code>

    相关文章

      网友评论

        本文标题:Spark中Master源码分析(二)

        本文链接:https://www.haomeiwen.com/subject/rqwklttx.html