Spark 应用程序提交启动之主线流程一文中已经分析完大致流程. 下面具体分析一下Yarn-Cluster模式。
-
寻找对应的childMainClass
SparkSubmit#runMain()-->#prepareSubmitEnvironment-->#doPrepareSubmitEnvironment
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
"org.apache.spark.deploy.yarn.YarnClusterApplication"
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = YARN_CLUSTER_SUBMIT_CLASS // 启动类是"org.apache.spark.deploy.yarn.YarnClusterApplication"
}
-
提交应用程序给YARN
org.apache.spark.deploy.yarn.YarnClusterApplication#start-->new Client(new ClientArguments(args), conf).run()-->org.apache.spark.deploy.yarn.Client#submitApplication
def submitApplication(): ApplicationId = {
......
yarnClient.submitApplication(appContext)
}
-
启动应用程序
众所周知,提交到YARN的应用,会启动ApplicationMaster这个首要角色,对于Spark来说是org.apache.spark.deploy.yarn.ApplicationMaster
。随后,我们的应用程序也将在此被启动。
大致流程如下
org.apache.spark.deploy.yarn.ApplicationMaster#main-->org.apache.spark.deploy.yarn.ApplicationMaster#run-->org.apache.spark.deploy.yarn.ApplicationMaster#runImpl-->
private def runImpl(): Unit = {
if (isClusterMode) {
runDriver()//启动Driver
} else {
runExecutorLauncher()
}
}
继续往下看
org.apache.spark.deploy.yarn.ApplicationMaster#runDriver
private def runDriver(): Unit = {
userClassThread = startUserApplication() //这里就是启动应用程序的逻辑所在
}
org.apache.spark.deploy.yarn.ApplicationMaster#startUserApplication
private def startUserApplication(): Thread = {
val mainMethod = userClassLoader.loadClass(args.userClass)
.getMethod("main", classOf[Array[String]]) //可以看到,通过反射拿到了应用程序的main方法
......
val userThread = new Thread { //这里创建了一个线程,并且反射调用了mainMethod
override def run() {
try {
mainMethod.invoke(null, userArgs.toArray)
} catch {
......
} finally {
......
}
}
}
......
userThread.setContextClassLoader(userClassLoader)
userThread.setName("Driver")
userThread.start() //线程启动, run方法开始执行
userThread
}
整个流程总结
1.Spark Submit阶段,获得把应用程序提交给YARN的YarnClusterApplication并start,这时启动的只是一个YARN客户端而已
2.Hadoop Yarn阶段,伴随ApplicationMaster的启动,开启UserThread,执行用户程序的main方法
收工!!!
网友评论