Xlearning.AM
本文主要介绍xlearning的am实现。am主要和rm进行交互,包括注册,申请资源,心跳等。也和nodemanager交互。
其中主要逻辑:
- 对象构建 new ApplicationMaster()
- 初始化 appMaster.init();
- 运行 appMaster.run()
下面主要讲解run方法所涉及的流程。
run
- 向resourcemanager注册自己。
- 获取输入分片
- 构建ContainerRequest,主要包括优先级,memory大小,cpu大小,nodes位置,rack位置。
- 根据设置的worker个数和ps个数。通过amrmAsync向RM请求资源
- 根据rmCallbackHandler来获取到RM返回的资源个数. 直到达到请求资源个数,不再请求。
- 启动各种app 任务。
- MPI:
extra ld libary path 构建
mpi exec command 构建
mpi exec path 构建
mpi exec : mpiExecProcess = rt.exec(commandArray, envs, mpiExec);
启动线程stdinThread,重定向mpiExecProcess的输出到stdout
启动线程stderrThread,重定向mpiExecProcess的输出到stderr
- MPI:
- allocateInputSplits。给每一个worker分配输入分片。
- buildOutputLocations 构建输出路径
- buildContainerLocalResource 构建container的本地资源。
- buildContainerEnv(worker) 构建worker container env
- buildContainerEnv(ps)构建ps container env
- workerContainerLaunchCommands 构建 worker container launch command
- psContainerLaunchCommands 构建 ps worker launch command
- 启动所有的ps container, containerListener监听器注册事件(这一点 去了解一下监听机制)
- 启动所有的worker container,containerListener监听器注册事件
- MPI:循环等待所有worker container启动,checkMpiWorkerState():如果有任何一个container失败,这时候mpiExecProcess 还活着,那么就destory mpiExecProcess
- 至此,MPI启动完毕
- 启动saveInnerModelMonitor线程。(具体职责???)
- 等待作业完成。
- MPI:
设置setAMFinished()
循环等待所有的container 完成任务。
如果所有的container全部成功完成并且mpiExitCode == 0,设置最终结果finalSuccess=true
- MPI:
- 如果最终执行结果成功,作业完成后续工作:
- 如果输出策略时stream
xxxxxxxx - 如果输出策略为文件
删除临时输出,生成最终输出文件
- 如果输出策略时stream
- 如果最终执行结果失败,并且不是最后一次尝试。《需要修改的地方》
取消先前注册的虚拟机关闭钩子cleanApplication。 - 注销 unregisterApp
针对上述21,cleanApplication这个关闭钩子在am.init的时候设置并添加 Runtime.getRuntime().addShutdownHook(cleanApplication);
下面分析cleanApplication这个钩子线程主要的工作内容。
init#cleanApplication
- 清除HADOOP_USER_NAME设置
- 如果 (isLastRetry||appCompleted)&& XLEARNING_CLEANUP_ENABLE==True
删除stagingDir - 获取jobLogPath:fs.defaultFS//tmp/XLearning/history/applicationid.
- 构建logMessage对象,最终将其写入到jobLogPath中。
logmessage:Map<String, Object>
- app.type:xlearning
- board.info: -
- containerid: containerMessage (each worker container 都进行收集)
- containerid: containerMessage (each ps container 都进行收集)
- timestamp.list: savedTimeStamp
- output.path:outputList
- worker.num: workerNum
备注:这里将log文件写入到文件中。针对jobhistory,会去改目录下读取该文件,来显示cpu memeory的使用情况
网友评论