org.apache.spark.deploy.SparkSubmit
-main
-submit.doSubmit(args)
- SparkSubmitArguments.parseArguments(args)
- - SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
- submit(args: SparkSubmitArguments, uninitLog: Boolean)
-runMain(args: SparkSubmitArguments, uninitLog: Boolean)
- new JavaMainApplication(mainClass)
-app.start(childArgs.toArray, sparkConf)
// Following constants are visible for testing.
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.yarn.YarnClusterApplication"
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS
//mainClass 是SparkApplication类型,构建SparkApplication
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
} else {
new JavaMainApplication(mainClass)
}
-YarnClusterApplication.start
//ClientArguments传递这spark submit的提交单数
- new Client(new ClientArguments(args), conf, null).run()
-Client.submitApplication()
private val yarnClient = YarnClient.createYarnClient
//用于与rm通信
protected ApplicationClientProtocol rmClient;
//获取appid
protected ApplicationClientProtocol rmClient;
//穿件容器上下文环境
val containerContext = createContainerLaunchContext(newAppResponse)
containerContext 上下文包含 运行ApplicationMaster的的classname 等启动参数
val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
//创建提交app的上下文环境
val appContext = createApplicationSubmissionContext(newApp, containerContext)
-yarnClient.submitApplication(appContext)
-rmClient.submitApplication(request);
搜索”org.apache.spark.deploy.yarn.ApplicationMaster“
-org.apache.spark.deploy.yarn.ApplicationMaster.main
-master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
ApplicationMaster成员
private val client = new YarnRMClient()
//用于am和rm通信
-amClient = AMRMClient.createAMRMClient()
-master = new ApplicationMaster(amArgs, sparkConf, yarnConf)
-master.run()
如果是集群模式,运行Driver
if (isClusterMode) {
runDriver()
} else {
runExecutorLauncher()
}
private def runDriver(): Unit = {
addAmIpFilter(None, System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV))
userClassThread = startUserApplication()
... ...
try {
//等待startUserApplication的中driver线程将用户代码sparkContext的创建完成,否则一直阻塞在这里
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
//sc不为空
if (sc != null) {
val rpcEnv = sc.env.rpcEnv
val userConf = sc.getConf
val host = userConf.get(DRIVER_HOST_ADDRESS)
val port = userConf.get(DRIVER_PORT)
//注册AM,申请西苑
registerAM(host, port, userConf, sc.ui.map(_.webUrl), appAttemptId)
val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(host, port),
YarnSchedulerBackend.ENDPOINT_NAME)
//创建分配器,返回资源可用列表
createAllocator(driverRef, userConf, rpcEnv, appAttemptId, distCacheConf)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
throw new IllegalStateException("User did not initialize spark context!")
}
//当资源准备就绪,调用resumeDriver方法,改变状态,让driver线程继续执行(用户代码逻辑)
resumeDriver()
userClassThread执行完之后,rundriver方法再继续执行
userClassThread.join()
} catch {
... ...
} finally {
resumeDriver()
}
}
userClassThread = startUserApplication()
private def startUserApplication(): Thread = {
logInfo("Starting the user application in a separate Thread")
... ...
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]])
val userThread = new Thread {
override def run(): Unit = {
try {
if (!Modifier.isStatic(mainMethod.getModifiers)) {
logError(s"Could not find static main method in object ${args.userClass}")
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
} else {
//静态方法,也就是用户的用户编写Job Object 的main方法,调用用户代码
mainMethod.invoke(null, userArgs.toArray)
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
logDebug("Done running user class")
}
} catch {
......
} ... ...
}
}
userThread.setContextClassLoader(userClassLoader)
//设置drive线程
userThread.setName("Driver")
//设置drive线程启动
userThread.start()
userThread
}
创建SparkContext
val sc = new SparkContext(sparConf)
SparkContext进行sc初始化的时候,有一段代码如下
// Post init
_taskScheduler.postStartHook()
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
...
override def postStartHook(): Unit = {
//sparkContextInitialized干两个事儿
//SparkContext 初始化后,唤醒runDriver 方法继续执行
//暂停driver线程(user thread),为了让在runDriver函数中进行初始化
ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}
}
private def sparkContextInitialized(sc: SparkContext) = {
sparkContextPromise.synchronized {
//SparkContext 初始化后,唤醒runDriver 方法继续执行
// Notify runDriver function that SparkContext is available
sparkContextPromise.success(sc)
//暂停driver线程(user thread),为了让在runDriver函数中进行初始化
// Pause the user class thread in order to make proper initialization in runDriver function.
sparkContextPromise.wait()
}
}
-TaskSchedulerImpl.postStartHook
TaskSchedulerImpl.waitBackendReady
private def waitBackendReady(): Unit = {
if (backend.isReady) {
return
}
//循环等待知道资源就绪,此时用户代码不会往下执行
//那什么时候driver线程会继续执行?
//当rundrive方法调用resumeDriver,改变backend 状态,代表资源就绪
while (!backend.isReady) {
// Might take a while for backend to be ready if it is waiting on resources.
if (sc.stopped.get) {
// For example: the master removes the application for some reason
throw new IllegalStateException("Spark context stopped while waiting for backend")
}
synchronized {
this.wait(100)
}
}
}
runDriver方法的registerAM
private val client = new YarnRMClient()
client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress)
amClient = AMRMClient.createAMRMClient()
通过AMRMClient向RM注册
amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl)
private def createAllocator(
driverRef: RpcEndpointRef,
_sparkConf: SparkConf,
rpcEnv: RpcEnv,
appAttemptId: ApplicationAttemptId,
distCacheConf: SparkConf): Unit = {
... ...
val appId = appAttemptId.getApplicationId().toString()
val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
val localResources = prepareLocalResources(distCacheConf)
... ...
//创建分配器
allocator = client.createAllocator(
yarnConf,
_sparkConf,
appAttemptId,
driverUrl,
driverRef,
securityMgr,
localResources)
... ...
//获取可用资源列表
allocator.allocateResources()
val ms = MetricsSystem.createMetricsSystem(MetricsSystemInstances.APPLICATION_MASTER,
sparkConf, securityMgr)
val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
ms.registerSource(new ApplicationMasterSource(prefix, allocator))
// do not register static sources in this case as per SPARK-25277
ms.start(false)
metricsSystem = Some(ms)
reporterThread = launchReporterThread()
}
//处理可用于分配的容器
handleAllocatedContainers(allocatedContainers.asScala.toSeq)
def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
// Match incoming requests by host
val remainingAfterHostMatches = new ArrayBuffer[Container]
//可分配容器 分类整理按主机名和机架(首选位置的应用)
for (allocatedContainer <- allocatedContainers) {
matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
containersToUse, remainingAfterHostMatches)
}
... ...
val remainingAfterRackMatches = new ArrayBuffer[Container]
if (remainingAfterHostMatches.nonEmpty) {
var exception: Option[Throwable] = None
val thread = new Thread("spark-rack-resolver") {
override def run(): Unit = {
try {
for (allocatedContainer <- remainingAfterHostMatches) {
val rack = resolver.resolve(allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
}
} catch {
case e: Throwable =>
... ...
}
//运行已分配容器进行
runAllocatedContainers(containersToUse)
logInfo("Received %d containers from YARN, launching executors on %d of them."
.format(allocatedContainers.size, containersToUse.size))
}
runAllocatedContainers 启动容器
private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = synchronized {
for (container <- containersToUse) {
val rpId = getResourceProfileIdFromPriority(container.getPriority)
executorIdCounter += 1
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
val executorId = executorIdCounter.toString
val yarnResourceForRpId = rpIdToYarnResource.get(rpId)
assert(container.getResource.getMemory >= yarnResourceForRpId.getMemory)
logInfo(s"Launching container $containerId on host $executorHostname " +
s"for executor with ID $executorId for ResourceProfile Id $rpId")
... ...
TargetNum容器大于Running的容器数,说明还需要启动容器
if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()
if (launchContainers) {
启动线程池,启动容器
launcherPool.execute(() => {
try {
new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
containerMem,
containerCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources,
rp.id
).run()
updateInternalState()
} catch {
... ...
}
})
} else {
// For test only
updateInternalState()
}
... ...
}
}
ExecutorRunnable.run
//
nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
//通过nmClient 通知指定nm启动contanter
startContainer()
startContainer
def startContainer(): java.util.Map[String, ByteBuffer] = {
... ...
//prepareCommand 是准备启动容器进程的脚本
//启动org.apache.spark.executor.YarnCoarseGrainedExecutorBackend 进程(excutor 的通信后台)
val commands = prepareCommand()
ctx.setCommands(commands.asJava)
ctx.setApplicationACLs(
YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)
... ....
// Send the start request to the ContainerManager
try {
//启动container,携带启动容器的上下文ctx
nmClient.startContainer(container.get, ctx)
} catch {
... ...
}
}
org.apache.spark.executor.YarnCoarseGrainedExecutorBackend
-CoarseGrainedExecutorBackend.run
def run(
arguments: Arguments,
backendCreateFn: (RpcEnv, Arguments, SparkEnv, ResourceProfile) =>
CoarseGrainedExecutorBackend): Unit = {
... ...
//可以找到driver 与driver通信
val fetcher = RpcEnv.create(
"driverPropsFetcher",
arguments.bindAddress,
arguments.hostname,
-1,
executorConf,
new SecurityManager(executorConf),
numUsableCores = 0,
clientMode = true)
var driver: RpcEndpointRef = null
val nTries = 3
for (i <- 0 until nTries if driver == null) {
try {
driver = fetcher.setupEndpointRefByURI(arguments.driverUrl)
} catch {
case e: Throwable => if (i == nTries - 1) {
throw e
}
}
}
//创建Executor的evn环境
val env = SparkEnv.createExecutorEnv(driverConf, arguments.executorId, arguments.bindAddress,
arguments.hostname, arguments.cores, cfg.ioEncryptionKey, isLocal = false)
//设置rpcEnv环境的通信终端
backendCreateFn 其实就是YarnCoarseGrainedExecutorBackend
env.rpcEnv.setupEndpoint("Executor",
backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
arguments.workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
}
}
//消息循环器
var messageLoop: MessageLoop = null
try {
messageLoop = endpoint match {
case e: IsolatedRpcEndpoint =>
//匹配消息循环器类型
new DedicatedMessageLoop(name, e, this)
case _ =>
sharedLoop.register(name, endpoint)
sharedLoop
}
endpoints.put(name, messageLoop)
} catch {
case NonFatal(e) =>
endpointRefs.remove(endpoint)
throw e
}
-DedicatedMessageLoop
//收件箱
private val inbox = new Inbox(name, endpoint)
-Inbox
protected val messages = new java.util.LinkedListInboxMessage
// OnStart should be the first message to process
//放入一个OnStart消息
inbox.synchronized {
messages.add(OnStart)
}
//RpcEndpoint的生命周期
* {@code constructor -> onStart -> receive* -> onStop}
private[spark] trait RpcEndpoint {
CoarseGrainedExecutorBackend.onStart
override def onStart(): Unit = {
... ...
logInfo("Connecting to driver: " + driverUrl)
try {
_resources = parseOrFindResources(resourcesFileOpt)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
//获取driver
driver = Some(ref)
//像driver发送RegisterExecutor消息
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls,
extractAttributes, _resources, resourceProfile.id))
}(ThreadUtils.sameThread).onComplete {
case Success(_) =>
self.send(RegisteredExecutor)
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
driver是一个线程,所有是SparkContext接收消息
_schedulerBackend是driver通信后台
private var _schedulerBackend: SchedulerBackend = _
//集群模式的SchedulerBackend
CoarseGrainedSchedulerBackend
//消息回复
CoarseGrainedSchedulerBackend.receiveAndReply
匹配RegisterExecutor消息
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls,
attributes, resources, resourceProfileId) =>
总的核数 注册数量增加
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
//最后回复一个true表示注册成功
context.reply(true)
CoarseGrainedExecutorBackend.收到消息
case Success(_) =>
//给自己发送一条消息,表示注册完毕
self.send(RegisteredExecutor)
CoarseGrainedExecutorBackend.收到给自己发送的RegisteredExecutor消息
override def receive: PartialFunction[Any, Unit] = {
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
//创建Executor计算对象,区别CoarseGrainedExecutorBackend(通信对象)
executor = new (executorId, hostname, env, userClassPath, isLocal = false,
resources = _resources)
//给driver发送LaunchedExecutor 消息
driver.get.send(LaunchedExecutor(executorId))
driver端的CoarseGrainedSchedulerBackend receive到
LaunchedExecutor消息
case LaunchedExecutor(executorId) =>
//增加核数
executorDataMap.get(executorId).foreach { data =>
data.freeCores = data.totalCores
}
//做一些操作 //tode
makeOffers(executorId)
case e =>
logError(s"Received unexpected message. ${e}")
网友评论