Spark 应用程序提交启动之主线流程 中提到org.apache.spark.deploy.SparkSubmit#prepareSubmitEnvironment 可获取不同情况下的childMainClass(启动类类名),并最终得到SparkApplication实例。
下面继续分析Client模式下的具体情况:
- prepareSubmitEnvironment()内调用
doPrepareSubmitEnvironment()
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
try {
doPrepareSubmitEnvironment(args, conf)
} catch {
case e: SparkException =>
printErrorAndExit(e.getMessage)
throw e
}
}
- doPrepareSubmitEnvironment() 中获得client模式下的全限定类名
private def doPrepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
......
// In client mode, launch the application main class directly
// In addition, add the main application jar and any added jars (if any) to the classpath
if (deployMode == CLIENT) {
childMainClass = args.mainClass // childMainClass就是我们脚本提交时传入的类名
}
......
}
-
具体如何启动
Spark 应用程序提交启动之主线流程 已经提到,启动逻辑最终在org.apache.spark.deploy.SparkSubmit#runMain中, 如下:
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
1.val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) // 获取childMainClass,这里也是区分各种集群模式各种部署模式的主要逻辑所在
2. try {
mainClass = Utils.classForName(childMainClass) //获取mainClass的Class对象
} catch {
......
}
}
3. val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
mainClass.newInstance().asInstanceOf[SparkApplication] //通过反射mainClass创建SparkApplication实例
} else {
// SPARK-4170
if (classOf[scala.App].isAssignableFrom(mainClass)) {
printWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
}
new JavaMainApplication(mainClass) //通过创建JavaMainApplication包装mainClass, JavaMainApplication也是SparkApplication的实现
}
4. try {
app.start(childArgs.toArray, sparkConf) // 调用start方法,SparkApplication启动
}
}
这里要注意的是,Client模式下得到的childMainClass(也就是我们自己写的程序类)并不是SparkApplication的实例。根据步骤3可知,它必须先被包装为JavaMainApplication,而JavaMainApplication是SparkApplication的实例。
最后看一下JavaMainApplication到底做了什么
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
if (!Modifier.isStatic(mainMethod.getModifiers)) {
throw new IllegalStateException("The main method in the given main class must be static")
}
val sysProps = conf.getAll.toMap
sysProps.foreach { case (k, v) =>
sys.props(k) = v
}
mainMethod.invoke(null, args)
}
}
很明显,当调用JavaMainApplication#start()
时, 实际上是直接反射启动的我们自己写的程序类的mian()
方法。
收工!!!
网友评论