本文基于Spark 1.6.3源码,采用一步一步深入的方式来展开阅读,本文是为了纪录自己在阅读源码时候的思路,看完一遍真的很容易忘记,写一篇文章梳理一遍可以加深印象。
在spark源码阅读之executor模块①中,AppClient已经实例化完成,且注册了名为ClientEndpoint的通信端,调用其onStart方法,在其中又调用了registerWithMaster方法向Master注册App,本文将详细剖析如何注册App,注册完之后又是如何分配加载Executor和相关资源的。
向Master注册app
registerWithMaster(1)的参数传入整型数字1,表明这是第一次向Master注册,程序会周期性尝试向Master注册app,直到收到Master返回已经注册成功的信息,或者尝试达到最大次数而失败,以下是源码实现:
/**
* Register with all masters asynchronously. It will call `registerWithMaster` every
* REGISTRATION_TIMEOUT_SECONDS seconds until exceeding REGISTRATION_RETRIES times.
* Once we connect to a master successfully, all scheduling work and Futures will be cancelled.
*
* nthRetry means this is the nth attempt to register with master.
*
* 异步的向所有master发起注册请求,每隔REGISTRATION_TIMEOUT_SECONDS周期将会重新尝试注册
* 直到达到最大重试次数REGISTRATION_RETRIES
* 一旦成功连上了某台master,所有的调度工作和异步请求句柄将会被取消
*
* nthRetry代表重复调用自身注册的次数
*/
private def registerWithMaster(nthRetry: Int) {
registerMasterFutures.set(tryRegisterAllMasters())
registrationRetryTimer.set(registrationRetryThread.scheduleAtFixedRate(new Runnable {
override def run(): Unit = {
if (registered.get) {
registerMasterFutures.get.foreach(_.cancel(true))
registerMasterThreadPool.shutdownNow()
} else if (nthRetry >= REGISTRATION_RETRIES) {
markDead("All masters are unresponsive! Giving up.")
} else {
registerMasterFutures.get.foreach(_.cancel(true))
registerWithMaster(nthRetry + 1)
}
}
}, REGISTRATION_TIMEOUT_SECONDS, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
接下来我们看其中异步请求的方法tryRegisterAllMasters,它返回的是一个Futures数组,表明它是不堵塞线程的,哪个线程先拿到注册的回应都可以,那么其他的Future句柄就会被取消掉,在tryRegisterAllMasters方法中,通过sparkUrl拿到Master的地址,在这里注册了Master的EndpointRef,发送一条RegisterApplication消息,其中封装了AppDescription,以下是源码:
/**
* Register with all masters asynchronously and returns an array `Future`s for cancellation.
*/
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
for (masterAddress <- masterRpcAddresses) yield {
registerMasterThreadPool.submit(new Runnable {
override def run(): Unit = try {
if (registered.get) {
return
}
logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
val masterRef =
rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
masterRef.send(RegisterApplication(appDescription, self)) //向Master发起注册Application的请求
} catch {
case ie: InterruptedException => // Cancelled
case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
}
})
}
}
Master的回应
Master在收到AppClient的RegisterApplication请求后,首先如果是standby master则不做响应,如果是active的,那么它会创建一个ApplicationInfo实例将传过来的app信息封装,然后注册App,持久化注册的App,然后给AppClient一个回应,让它别再请求了,最后会为新注册的App调度资源,以下是源码:
case RegisterApplication(description, driver) => { //从appClient接收到注册app的请求
// TODO Prevent repeated registrations from some driver
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, driver) //创建ApplicationInfo
registerApplication(app) //注册app
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app) //持久化
driver.send(RegisteredApplication(app.id, self)) //向AppClient发出注册响应信息
schedule() //重新调度资源:每次有新的app或者加入新的资源时都会调用
}
}
接下来我们通过registerApplication方法来分析Master在注册App的过程中做了些什么:
private def registerApplication(app: ApplicationInfo): Unit = {
val appAddress = app.driver.address
if (addressToApp.contains(appAddress)) { //如果是已有的ClientEndpoint地址,则说明已经注册过了
logInfo("Attempted to re-register application at same address: " + appAddress)
return
}
//Master将App添加到自己维护的数据结构中
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
endpointToApp(app.driver) = app
addressToApp(appAddress) = app
waitingApps += app
}
Master端注册App的过程其实就是将App维护到自己的成员变量中。
接着,Master将注册的App元数据信息持久化,持久化引擎有两种:依赖于Zookeeper,或者直接落地到FileSystem,用户也可以自定义持久化的方式,一般生产环境会托管给Zookeeper管理。
持久化之后,Master向AppClient发出RegisteredApplication的响应,表明App已注册,AppClient在收到响应后,也会去更新它维护的一些数据结构,然后取消所有的注册请求,以下是源码:
case RegisteredApplication(appId_, masterRef) =>
// FIXME How to handle the following cases?
// 1. A master receives multiple registrations and sends back multiple
// RegisteredApplications due to an unstable network.
// 2. Receive multiple RegisteredApplication from different masters because the master is
// changing.
appId.set(appId_)
registered.set(true)
master = Some(masterRef)
listener.connected(appId.get)
Master对于注册的App所做的最后一件事情就是,重新调度资源,除了新加入App的情况外,资源本身有变动,如新增一台Worker,也会调用schedule方法重新调度资源,以下是其源码解析:
/**
* Schedule the currently available resources among waiting apps. This method will be called
* every time a new app joins or resource availability changes.
*/
private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return //ALIVE表明Master是主Master,且已经COMPLETING_RECOVERY
}
// Drivers take strict precedence over executors
// 筛选出Alive的workers并将它们打散
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
// We assign workers to each waiting driver in a round-robin fashion. For each driver, we
// start from the last worker that was assigned a driver, and continue onwards until we have
// explored all alive workers.
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1 //已经访问过的Worker
// 如果这个worker的剩余内存和cores满足driver的需求
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver) //在这个worker上加载driver
waitingDrivers -= driver //清除缓存
launched = true //修改标志退出循环
}
curPos = (curPos + 1) % numWorkersAlive //确保访问指针的散列性
}
}
startExecutorsOnWorkers() //开始在workers上加载executors
}
可以观察到,这个方法先是筛选出合适的worker,然后在其中一个上面调用launchDriver方法加载了driver,最后调用startExecutorsOnWorkers方法分配executors,至此终于进入正题:driver和executor的资源分配与加载,所以阅读源码是一件非常需要耐心的事情。
driver和executor的资源分配
首先来看launchDriver方法
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
logInfo("Launching driver " + driver.id + " on worker " + worker.id)
worker.addDriver(driver)
driver.worker = Some(worker) //维护worker和driver的关系
worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) //向worker发送LaunchDriver的请求
driver.state = DriverState.RUNNING //将Driver的状态置为RUNNING
}
launchDriver方法中维护了worker和driver的关系,并向worker端发送了LaunchDriver的请求,我们去worker端看它收到LaunchDriver请求的动作:
case LaunchDriver(driverId, driverDesc) => {
logInfo(s"Asked to launch driver $driverId")
val driver = new DriverRunner( //创建一个DriverRunner
conf,
driverId,
workDir,
sparkHome,
driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
self,
workerUri,
securityMgr)
drivers(driverId) = driver
driver.start() //调用start方法启动driver
coresUsed += driverDesc.cores //driver消耗的cpu和内存
memoryUsed += driverDesc.mem
}
可以看出程序创建了一个DriverRunner对象,然后调用其start方法启动线程,在start方法中下载并提交了额外的jar包,封装之后开启线程,之后向worker发送一条DriverStateChanged的消息。
接下来展开分析startExecutorsOnWorkers方法,在workers上调度加载executors
/**
* Schedule and launch executors on workers
* 在worker上调度加载executors
*/
private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
// 对app的调度是一个简单FIFO的队列
for (app <- waitingApps if app.coresLeft > 0) { //对于队列中的每一个app
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executor
// 满足条件的worker:stat是ALIVE,内存和cpu满足app的对于一个executor的需求
val usableWorkers: Array[WorkerInfo] = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
// 记录对应usableWorkers中每个worker可以分配的cores
val assignedCores: Array[Int] = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we've decided how many cores to allocate on each worker, let's allocate them
// 这里开始为worker上的executors分配资源
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
该方法中显示从一个简单FIFO的app队列中取出一个app,app中存有每个executor需要的内存和cpu,再筛选出合适的workers,合适的workers是指:它首先是活着的,其次它的剩余内存和cpu满足该app每个executor需要的内存和cpu,筛选出这些满足条件的workers然后按照空闲cpu倒序排列。
接下来scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)这个方法是在筛选出的workers中分配executors。
入参中有个spreadOutApps参数比较特别,是用来区分两种不同的分配策略,字面上的意思是分散分配apps,所以:
- 当spreadOutApps为true时,尽量分散分配executors在更多的workers上,程序默认是这种模式。
- 当spreadOutApps为false时,则在更少的workers上分配executors,适用于cpu密集型且内存占用较少的应用。
下面我们点入scheduleExecutorsOnWorkers来看其实现
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
val coresPerExecutor = app.desc.coresPerExecutor //取出app中的cores分配需求coresPerExecutor
val minCoresPerExecutor = coresPerExecutor.getOrElse(1) //最少cpu取coresPerExecutor,如果不存在则是1
val oneExecutorPerWorker: Boolean = coresPerExecutor.isEmpty //coresPerExecutor为空则oneExecutorPerWorker为tr
val memoryPerExecutor = app.desc.memoryPerExecutorMB //取出app中内存分配需求memoryPerExecutor
val numUsable = usableWorkers.length //待分配workers个数
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) //可以分配的cores取app需求的cores
/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {
// 条件1:需要继续分配的cores大于等于每个executor需求的cores,最少为1core
val keepScheduling = coresToAssign >= minCoresPerExecutor
// 条件2:每个worker上剩余的cores是否大于minCoresPerExecutor
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
// 如果coresPerExecutor不是空的,也就是用户通过参数定义了,或者已分配的executor为0
if (launchingNewExecutor) {
val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
// 条件3:每个worker上剩余的内存是否大于等于每个executor需要的内存
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
// 条件4:已分配的executor总和加上app需要的executor数量,是否小于app的可以分配的executor限制
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
// 条件1 2 3 4都满足则返回true,否则返回false
keepScheduling && enoughCores && enoughMemory && underLimit
} else { //如果是需要给已存在的executor添加cores
// We're adding cores to an existing executor, so no need
// to check memory and executor limits
keepScheduling && enoughCores //仅需要满足持续调度和足够cores两个条件即可
}
}
// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits
var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutor(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor
// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
assignedExecutors(pos) = 1
} else {
assignedExecutors(pos) += 1
}
// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps) {
keepScheduling = false //退出这个循环,去找下一个worker继续分配executors
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}
代码段较长,其中我也做了很详细的注释,这里就只说一下workers可以继续分配executors的条件:
条件1(keepScheduling):如果需要继续分配的cores数量(取值于app还需要的cores和workers还可提供的cores的最小值)大于等于每个executor需求的cores,则为true,满足持续调度条件;
条件2(enoughCores):如果workers队列中存有一个worker它剩余的cores满足每个executor需求的cores,则为true,说明还有足够的cores;
条件3(enoughMemory):如果workers队列中存有一个worker剩余的内存满足每个executor需要的内存,则为true,说明还有足够的内存;
条件4(underLimit):这次调度需要分配的executors数量+已分配的executors数量小于app的executors总数限制,则为true,app在初始化分配的时候默认没有限制,而当后续分配的时候这个限制会根据情况动态变化。
以上4个条件,当分配新的executor的时候需要全部满足,如果是给现有的executor增加cores,仅满足条件1和2即可。
executor的加载
当资源分配完成之后,开始加载executors了,在allocateWorkerResourceToExecutors方法中封装了worker和它对应的executor以及相关资源为一个ExecutorDesc对象,并调用launchExecutor方法加载Executor,源码如下:
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val exec: ExecutorDesc = app.addExecutor(worker, coresToAssign)
// Master调用launchExecutor方法来向worker发送请求,同时会更新Master保存的Worker的信息
// 这些资源信息并不是Worker主动上报到Master的,而是Master主动维护的,Master不会等到Worker上成功启动Executor再来更新Worker信息
// 如果Worker启动Executor启动失败,那么它会发送FAILED消息给Master
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
在launchExecutor方法中,Master向Worker发送了LaunchExecutor消息,Worker端收到相关信息之后开始按照Master的分配调度来加载executors
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec) //更新Worker的信息
worker.endpoint.send(LaunchExecutor(masterUrl, //向Worker发送LaunchExecutor请求,Worker接到请求后就会开始加载executors
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
exec.application.driver.send( //向appClient发送executor已经添加的信息
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
在以上方法中,Master还向AppClient端发送了ExecutorAdded的消息。
我们首先来看Worker端收到LaunchExecutor消息后的动作:
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => //从Master收到请求加载executors
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// Create local dirs for the executor. These are passed to the executor via the
// SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
// application finishes.
val appLocalDirs = appDirectories.get(appId).getOrElse {
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
val appDir = Utils.createDirectory(dir, namePrefix = "executor")
Utils.chmod700(appDir)
appDir.getAbsolutePath()
}.toSeq
}
appDirectories(appId) = appLocalDirs
val manager = new ExecutorRunner( //创建一个ExecutorRunner
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start() //启动线程
coresUsed += cores_
memoryUsed += memory_
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None)) //告知Master executor的状态改变了
} catch {
case e: Exception => {
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None))
}
}
}
其中主要创建了ExecutorRunner,其中封装了executor加载需要的信息,调用其start方法启动,在start方法中创建了一个线程,并调用fetchAndRunExecutor方法
private[worker] def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() } //调用fetchAndRunExecutor向driver注册executor
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
// It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
// be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
if (state == ExecutorState.RUNNING) {
state = ExecutorState.FAILED
}
killProcess(Some("Worker shutting down")) }
}
在fetchAndRunExecutor方法中创建了一盒ProcessBuilder对象,然后封装了一些参数,最后通过执行命令的方式启动了CoarseGrainedExecutorBackend,先看源码:
private def fetchAndRunExecutor() {
try {
// Launch the process
// 拼接linux命令用来启动CoarseGrainedExecutorBackend
val builder: ProcessBuilder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
val command = builder.command()
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $formattedCommand")
builder.directory(executorDir)
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
// Add webUI log urls
val baseUrl =
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
process = builder.start() //使用ProcessBuilder执行linux命令,启动CoarseGrainedExecutorBackend
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40)
// Redirect its stdout and stderr to files
// 重定向stdout和stderr
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
// 等待进程退出
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
// 向worker发送ExecutorStateChanged的消息
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
} catch {
case interrupted: InterruptedException => {
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None)
}
case e: Exception => {
logError("Error running executor", e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
}
}
}
这里有个问题我绕了好久才搞清楚,就是为什么执行的linux命令是启动了CoarseGrainedExecutorBackend进程,从这段代码完全看不出任何端倪,这需要追溯到SparkDeploySchedulerBackend创建AppClient对象的时候,封装ApplicationDescription对象时,其中有一个参数是command,这个command指定了创建CoarseGrainedExecutorBackend,创建AppClient这部分过程在spark源码阅读之executor模块①中分析过,我这里把关键的代码贴出来:
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
Linux命令通过CoarseGrainedExecutorBackend的main方法来启动进程,main方法中解析了命令中传入的一些启动参数,然后调用run方法启动:
def main(args: Array[String]) {
var driverUrl: String = null
var executorId: String = null
var hostname: String = null
var cores: Int = 0
var appId: String = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()
var argv = args.toList
while (!argv.isEmpty) {
argv match {
case ("--driver-url") :: value :: tail =>
driverUrl = value
argv = tail
case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail
case ("--hostname") :: value :: tail =>
hostname = value
argv = tail
case ("--cores") :: value :: tail =>
cores = value.toInt
argv = tail
case ("--app-id") :: value :: tail =>
appId = value
argv = tail
case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
argv = tail
case ("--user-class-path") :: value :: tail =>
userClassPath += new URL(value)
argv = tail
case Nil =>
case tail =>
// scalastyle:off println
System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
// scalastyle:on println
printUsageAndExit()
}
}
if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
appId == null) {
printUsageAndExit()
}
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
run方法中比较关键的代码段是,注册了名为”Executor“的通信端,这样就调用了生命周期的onStart方法,由于篇幅较长,我把关键代码截选出来:
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
在onStart方法中,向DriverEndpoint发起了RegisterExecutor请求,注册Executor,DriverEndpoint的创建过程在spark源码阅读之executor模块①中有说明,以下为onstart源码:
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref) //DriverEndpoint的引用
ref.ask[RegisterExecutorResponse](
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls)) //向driver注册executor
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) => Utils.tryLogNonFatalError {
Option(self).foreach(_.send(msg)) // msg must be RegisterExecutorResponse
}
case Failure(e) => {
logError(s"Cannot register with driver: $driverUrl", e)
System.exit(1)
}
}(ThreadUtils.sameThread)
}
DriverEndpoint在收到executor的注册请求后,会创建ExecutorData对象封装executor的信息,然后把executor注册到其数据结构中,最后调用makeOffers()方法给注册的executor分配Task,这里就和spark源码阅读之executor模块①中最后一节内容衔接上了,分配Task的内容我们在下一篇文章中展开。以下是RegisterExecutor的源码:
case RegisterExecutor(executorId, executorRef, hostPort, cores, logUrls) => //driver在接收到RegisterExecutor请求之后
if (executorDataMap.contains(executorId)) { //如果已存在该executorId
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
// 维护本身创建的一些数据结构
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorRef.address, executorAddress.host,
cores, cores, logUrls) //创建一个ExecuterData,把executor的一些信息封装进去
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
// Note: some tests expect the reply to come after we put the executor in the map
// 将executor的注册信息放入executorDataMap后,回复executor已注册完成
context.reply(RegisteredExecutor(executorAddress.host))
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers() //开始给注册的executor分配Task
}
接下来我们看一下executor端收到已注册完成消息之后的动作:
case RegisteredExecutor(hostname) =>
logInfo("Successfully registered with driver")
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
CoarseGrainedExecutorBackend收到DriverEndpoint的RegisteredExecutor消息之后,创建了executor实例,至此exector创建完成,接下来需要考虑的是如何给executor分配Task并执行,将放在下一篇文章中展开。
网友评论