
所谓的Driver就是/path/to/examples.jar。

我们选择其中的Client这条执行路径。
在 Client 的 Main 中,把 args 也就是 spark-submit 后跟的一些参数封装一下: val driverArgs = new ClientArguments(args)
,然后创建 RpcEnv,之后创建 Endpoint:
val rpcEnv =
RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
rpcEnv.awaitTermination()
ClientEndpoint的onStart发送消息给Master,注册Driver,如下
接下来看下driverDescription的格式,如下
DriverDescription 的信息包含:jar存放的Url、多大内存、多少核,看下 Command的内容:

如上图注释,Spark使用DriverWrapper启动用户APP的main函数,而不是直接启动,这是为了Driver程序和启动Driver的Worker程序共命运(源码注释中称为share fate),即如果此Worker挂了,对应的Driver也会停止。至此,Client提交Driver流程结束了。
Master处理RequestSubmitDriver消息
Master的receiveAndReply方法接收Client发送的消息RequestSubmitDriver,将收到的Driver注册到waitingDrivers,如下
总结
介绍了deploy-mode=cluster模式下,从命令行提交任务,到Master端接收并注册Driver的过程,完整流程如下
网友评论