SparkApplication是应用程序启动的万能入口,下面分析之
1.org.apache.spark.deploy.SparkSubmit#main 内调用 submit()
override def main(args: Array[String]): Unit = {
......
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) //传入参数为SUBMIT,则往下调用submit方法
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
2.org.apache.spark.deploy.SparkSubmit#submit 内调用 runMain()
private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
def doRunMain(): Unit = {
if (args.proxyUser != null) {
......
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(args, uninitLog)
}
})
} catch {
......
} else {
runMain(args, uninitLog)
}
}
3.org.apache.spark.deploy.SparkSubmit#runMain 中 调用 prepareSubmitEnvironment()
获取 childMainClass,并创建SparkApplication启动之
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
1.val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) // 获取childMainClass,这里也是区分各种集群模式各种部署模式的主要逻辑所在
2. try {
mainClass = Utils.classForName(childMainClass) //获取mainClass的Class对象
} catch {
......
}
}
3. val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication] //通过反射mainClass创建SparkApplication实例
} else {
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
new JavaMainApplication(mainClass) //通过创建JavaMainApplication包装mainClass, JavaMainApplication也是SparkApplication的实现
}
4. try {
app.start(childArgs.toArray, sparkConf) // 调用start方法,SparkApplication启动
}
}
流程还是非常清晰简单,下面还会继续学习不同模式下的具体源码,收工!!
网友评论