前言
以Standalone 的 cluster 提交方式为例,从源码角度解析 driver 的启动流程
-
当我们敲下脚本执行
spark submit
指令之后,查看脚本可知,会启动一个org.apache.spark.deploy.SparkSubmit
提交进程。 -
从
main()
方法会进入到submit()
方法
def main(args: Array[String]): Unit = {
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
-
submit()
方法主要有以下两个方法:一个是prepareSubmitEnvironment()
另外一个是runMain()
private def submit(args: SparkSubmitArguments): Unit = {
val (childArgs, childClasspath, sysProps, childMainClass) = prepareSubmitEnvironment(args)
def doRunMain(): Unit = {
if (args.proxyUser != null) {
val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
UserGroupInformation.getCurrentUser())
try {
proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
override def run(): Unit = {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
})
} catch {
case e: Exception =>
// Hadoop's AuthorizationException suppresses the exception's stack trace, which
// makes the message printed to the output by the JVM not very helpful. Instead,
// detect exceptions with empty stack traces here, and treat them differently.
if (e.getStackTrace().length == 0) {
// scalastyle:off println
printStream.println(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
// scalastyle:on println
exitFn(1)
} else {
throw e
}
}
} else {
runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
}
- prepareSubmitEnvironment():主要就是下面4个方面的作用
Prepare the environment for submitting an application.
This returns a 4-tuple:
(1) the arguments for the child process,
(2) a list of classpath entries for the child,
(3) a map of system properties, and
(4) the main class for the child
- 为子进程准备提交的环境,主要包含以下几点 (该子进程我们这里叫 client 进程,但是不是通常我们说的 AppClient)
(1)运行所需参数
(2) 运行时 classpath 列表
(3) 系统属性的映射
(4) 运行的入口函数 main
其中有如下一段代码我们可以看到 standaloneCluster 启动方式指定的 client 进程为 org.apache.spark.deploy.Client
...
if (args.isStandaloneCluster) {
if (args.useRest) {
childMainClass = "org.apache.spark.deploy.rest.RestSubmissionClient"
childArgs += (args.primaryResource, args.mainClass)
} else {
// In legacy standalone cluster mode, use Client as a wrapper around the user class
childMainClass = "org.apache.spark.deploy.Client"
if (args.supervise) { childArgs += "--supervise" }
Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
childArgs += "launch"
childArgs += (args.master, args.primaryResource, args.mainClass)
}
if (args.childArgs != null) {
childArgs ++= args.childArgs
}
}
...
- runMain() :该方法顾名思义就是运行 main 方法,这个 main 方法就是我们上面 client 进程的 main 方法,值得一说的是,这里是通过反射的方法来运行 main 函数,而不是重新启动了一个进程。
- 接下来我们就去看看
org.apache.spark.deploy.Client
的 main 方法吧
object Client {
def main(args: Array[String]) {
// scalastyle:off println
if (!sys.props.contains("SPARK_SUBMIT")) {
println("WARNING: This client is deprecated and will be removed in a future version of Spark")
println("Use ./bin/spark-submit with \"--master spark://host:port\"")
}
// scalastyle:on println
val conf = new SparkConf()
val driverArgs = new ClientArguments(args)
if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
conf.set("spark.akka.logLifecycleEvents", "true")
}
conf.set("spark.rpc.askTimeout", "10")
conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
Logger.getRootLogger.setLevel(driverArgs.logLevel)
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, _, Master.ENDPOINT_NAME))
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
}
}
- 整段代码比较简单,主要做了一些参数设置,其中最主要的是创建了
ClientEndpoint
,并传入了masterEndpoints
,接下来就都是 client都是通过ClientEndpoint
与 master 通讯,那么我们就可以去看看ClientEndpoint
是怎么样的。
- 现在我们看到
ClientEndpoint
的onstart()
方法。如下:
override def onStart(): Unit = {
driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
// truncate filesystem paths similar to what YARN does. For now, we just require
// people call `addJar` assuming the jar is in the same directory.
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val classPathConf = "spark.driver.extraClassPath"
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val extraJavaOptsConf = "spark.driver.extraJavaOptions"
val extraJavaOpts = sys.props.get(extraJavaOptsConf)
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = new Command(mainClass,
Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
sys.env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
ayncSendToMasterAndForwardReply[SubmitDriverResponse](
RequestSubmitDriver(driverDescription))
case "kill" =>
val driverId = driverArgs.driverId
ayncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
}
}
- 这段代码的几个关键点
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
指定了我们 driver 进程的启动类ayncSendToMasterAndForwardReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription))
这里就是将启动 driver 的各种参数封装到RequestSubmitDriver
提交给 master ,并会异步的传回启动的信息。
本文到这里就结束了,后面的事情,网上的博客比较多,我就不乱哔哔了,写本篇文章主要是笔者被人问及driver启动,但是究其根的说清楚的在网上没有很清楚的说法,当然也可能是我们自己没找到,嘿嘿。
网友评论