美文网首页Spark源码解读spark
Spark中Worker源码分析(二)

Spark中Worker源码分析(二)

作者: lehi | 来源:发表于2016-03-20 22:52 被阅读102次

    继续前一篇的内容。前一篇内容为:

    Spark中Worker源码分析(一)http://www.jianshu.com/p/015b3ff0a5bf

    4.receive方法,
    receive方法中消息类型主要分为以下14种情况:

    (1)worker向master注册成功后,详见代码
    (2)worker向master发送心跳消息,如果还没有注册到master上,该消息将被忽略,详见代码
    (3)worker的工作空间的清理,详见代码
    (4)更换master,详见代码
    (5)worker注册失败,详见代码
    (6)再次连接worker,详见代码
    (7)创建executor,详见代码
    (8)executor的转态发生改变时,详见代码
    (9)kill executor,详见代码
    (10)创建driver,详见代码
    (11)kill driver,详见代码
    (12)driver的状态发生变化时,详见代码
    (13)将worker注册到master上,详见代码
    (14)app执行完毕,详见代码
    worker与master相关的交互为(1)(2)(4)(6)(13)
    worker与driver相关的交互为(10)(11)(12)
    worker与executor相关的交互为(3)(7)(8)(9)(14),需要说明的是(3)(14)它们的完成都与executor有着密切的联系。

    <code>
    override def receive: PartialFunction[Any, Unit] = {
    //(1)注册成功的Woker
    case RegisteredWorker(masterRef, masterWebUiUrl) =>
    logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
    registered = true
    changeMaster(masterRef, masterWebUiUrl)
    //守护线程15s发送一次心跳消息
    forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
    override def run(): Unit = Utils.tryLogNonFatalError {
    self.send(SendHeartbeat)
    }
    }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
    //如果允许清理
    if (CLEANUP_ENABLED) {
    logInfo(s"Worker cleanup enabled; old application directories will be deleted in: $workDir")
    forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
    override def run(): Unit = Utils.tryLogNonFatalError {
    //守护线程30min清理app文件夹
    self.send(WorkDirCleanup)
    }
    }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS)
    }
    //(2)worker向master发送心跳消息,如果还没有注册到master上,该消息将被忽略
    case SendHeartbeat =>
    if (connected) { sendToMaster(Heartbeat(workerId, self)) }
    //(3)worker的工作空间的清理
    case WorkDirCleanup =>
    //为了加快独立将来独立线程的清理工作,不要占用worker rpcEndpoint的端口号,拷贝ids所以它可以被清理线程使用
    val appIds = executors.values.map(.appId).toSet
    val cleanupFuture = concurrent.future {
    val appDirs = workDir.listFiles()
    if (appDirs == null) {
    throw new IOException("ERROR: Failed to list files in " + appDirs)
    }
    appDirs.filter { dir =>
    //目录正在被app使用-当清理时检查app是否在运行
    val appIdFromDir = dir.getName
    val isAppStillRunning = appIds.contains(appIdFromDir)
    dir.isDirectory && !isAppStillRunning &&
    !Utils.doesDirectoryContainAnyNewFiles(dir, APP_DATA_RETENTION_SECONDS)
    }.foreach { dir =>
    logInfo(s"Removing directory: ${dir.getPath}")
    Utils.deleteRecursively(dir)
    }
    }(cleanupThreadExecutor)
    cleanupFuture.onFailure {
    case e: Throwable =>
    logError("App dir cleanup failed: " + e.getMessage, e)
    }(cleanupThreadExecutor)
    //(4)更换master
    case MasterChanged(masterRef, masterWebUiUrl) =>
    logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL)
    changeMaster(masterRef, masterWebUiUrl)
    val execs = executors.values.
    map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
    masterRef.send(WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq))
    //(5)worker注册失败
    case RegisterWorkerFailed(message) =>
    if (!registered) {
    logError("Worker registration failed: " + message)
    System.exit(1)
    }
    //(6)再次连接Worker
    case ReconnectWorker(masterUrl) =>
    logInfo(s"Master with url $masterUrl requested this worker to reconnect.")
    //再次将worker注册到masters上
    registerWithMaster()
    //(7)创建Executor
    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores
    , memory_) =>if (masterUrl != activeMasterUrl) {
    logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
    } else {
    try {
    logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
    //创建executor的工作目录
    val executorDir = new File(workDir, appId + "/" + execId)
    if (!executorDir.mkdirs()) {
    throw new IOException("Failed to create directory " + executorDir)
    }
    //为executors创建本地目录,通过SPARK_EXECUTOR_DIRS环境变量设置,当app执行完后并删除
    val appLocalDirs = appDirectories.get(appId).getOrElse {
    Utils.getOrCreateLocalRootDirs(conf).map { dir =>
    val appDir = Utils.createDirectory(dir, namePrefix = "executor")
    Utils.chmod700(appDir)
    appDir.getAbsolutePath()
    }.toSeq
    }
    appDirectories(appId) = appLocalDirs
    val manager = new ExecutorRunner(
    appId,
    execId,
    appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
    cores_,
    memory_,
    self,
    workerId,
    host,
    webUi.boundPort,
    publicAddress,
    sparkHome,
    executorDir,
    workerUri,
    conf,
    appLocalDirs, ExecutorState.LOADING)
    executors(appId + "/" + execId) = manager
    manager.start()
    coresUsed += cores_
    memoryUsed += memory_
    sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
    } catch {
    case e: Exception => {
    logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
    if (executors.contains(appId + "/" + execId)) {
    executors(appId + "/" + execId).kill()
    executors -= appId + "/" + execId
    }
    sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
    Some(e.toString), None))
    }
    }
    }
    //(8)executor的转态发生改变时
    case executorStateChanged @ ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
    handleExecutorStateChanged(executorStateChanged)
    //(9)kill executor
    case KillExecutor(masterUrl, appId, execId) =>
    if (masterUrl != activeMasterUrl) {
    logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId)
    } else {
    val fullId = appId + "/" + execId
    executors.get(fullId) match {
    case Some(executor) =>
    logInfo("Asked to kill executor " + fullId)
    executor.kill()
    case None =>
    logInfo("Asked to kill unknown executor " + fullId)
    }
    }
    //(10)创建Driver
    case LaunchDriver(driverId, driverDesc) => {
    logInfo(s"Asked to launch driver $driverId")
    val driver = new DriverRunner(
    conf,
    driverId,
    workDir,
    sparkHome,
    driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
    self,
    workerUri,
    securityMgr)
    drivers(driverId) = driver
    driver.start(
    coresUsed += driverDesc.cores
    memoryUsed += driverDesc.mem
    }
    //(11)kill Driver
    case KillDriver(driverId) => {
    logInfo(s"Asked to kill driver $driverId")
    drivers.get(driverId) match {
    case Some(runner) =>
    runner.kill()
    case None =>
    logError(s"Asked to kill unknown driver $driverId")
    }
    }
    //(12)driver的状态发生变化时
    case driverStateChanged @ DriverStateChanged(driverId, state, exception) => {
    handleDriverStateChanged(driverStateChanged)
    }
    //(13)将worker注册到master上
    case ReregisterWithMaster =>
    reregisterWithMaster()
    //(14)app执行完毕
    case ApplicationFinished(id) =>
    finishedApps += id
    //删除执行完的app在执行过程中创建的本地文件
    maybeCleanupApplication(id)
    }
    </code>

    相关文章

      网友评论

        本文标题:Spark中Worker源码分析(二)

        本文链接:https://www.haomeiwen.com/subject/dhlklttx.html