Master 和 Worker关系图

总结
- master:通过读取配置,创建actorSystem,反射调用master,master启动后,执行生命周期方法,
preStart
和receiveWithLogging
,定时val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
清理失去心跳的Worker - worker:通过读取配置,加载worker所在服务器的cpu cores,memory大小等信息,创建actorSystem,反射调用worker,worker启动后执行生命周期方法
preStart
和receiveWithLogging
,向master注册信息,最重要的信息worker的cpu cores和memory资源大小,定时向master报心跳val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
,防止被master清理 - 所以master会保存worker各个节点的资源信息,与保持心跳,作为后续执行job资源分配,调度的基础

Spark中start-all.sh脚本


Master
1.查看master启动脚本start-master.sh
start-master.sh
脚本中可以看到master启动的时候,启动的是org.apache.spark.deploy.master.Master
类,所以要看源码,从这个类查看,在从Master
伴生对象main
方法入手

2. 源码分析
main
方法主要做了以下三件事
- 读取配置
- 创建
ActorSystem
- 通过
ActorSystem
启动Master
服务
image.png
流程1.加载配置文件 2.启动master
val args = new MasterArguments(argStrings, conf)
这句代码的功能就是加载配置文件,但是里面有可以借鉴Utils工具类的代码
image.png
image.png
关键点在val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)
,主要作用,调用创建了ActorSystem
image.png
startService函数
作为Utils.startServiceOnPort(port, startService, conf, name)
的参数,
image.png
Utils.startServiceOnPort(port, startService, conf, name)
中只是计算出master启动的端口
image.png
所以关键还是要看startService
方法,该方法又调用doCreateActorSystem
image.png
所以第一个红框的作用就是读取配置,包括端口信息,创建ActorSystem
,第二个红框,通过反射启动Master
image.png
启动Master
,Master
会走Actor的生命周期方法preStart
启动,receiveWithLogging
,接收信息
preStart
方法中,启动webUi等操作,最重要的是这句代码,代码,启动一个定时器,定时发送给自己一个case objec CheckForWorkerTimeOut
,间隔是val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
image.png
Master中最最重要的方法,receiveWithLogging
,master启动后,通过该方法接收message做相应的处理,首先查看preStart中,查看定时发CheckForWorkerTimeOut
给自己的receive调用的方法,查看源码,
总结:Master启动后,定时发送CheckForWorkerTimeOut
,给自己,在receiveWithLogging
,调用timeOutDeadWorkers
,定时清理超过心跳时间的Worker,从val workers = new HashSet[WorkerInfo]
移除
image.png
image.png
Worker
1.查看worker启动脚本start-slave.sh
start-slaves.sh
启动start-slave.sh
,启动org.apache.spark.deploy.worker.Worker
类


2.源码分析
Worker启动跟Master启动几乎一模一样,
- 读取配置,获取
cpu cores
和`memeory - 创建
ActorSystem
- 反射创建
Worker
,Worker启动,调用生命周期方法
image.png
image.png

所以直接看Worker的preStart
跟receiveWithLogging
preStart
方法中,会创建工作目录WorkDir
,启动WorkWebUi
,最最重要的是,向master
注册,registerWithMaster
查看方法,调用tryRegisterAllMasters
,获取master uri 比如master:7070
,获取master的actor,然后向master发送异步无返回值message,将自己的信息封装到case class RegisterWorker
,包括自己的id,ip,port, cpu cores,内存大小信息等,所以此时需要到master的receiveWithLogging
查看接收到的RegisterWorker
做出什么样的操作



master接收到worker的信息后,将RegisterWorker 的信息封装成一个WorkerInfo(拥有worker的信息,id,ip,port, cpu cores,内存大小信息等),再将workerinfo的信息添加到persistenceEngine
持久化起来,然后向worker发送RegisteredWorker,告诉worker注册成功,接着调用调度方法schedule()
,这个方法大概是这样的,master可能拥有许多client提交的任务,当资源不足的时候,任务会排队,所以当有新的资源,就是worker加入的时候,如果此时有任务排队,又有资源加入master会调度任务分配资源,就是这个schedule()
方法。woker收到注册成功的信息RegisteredWorker
,所以此时需要去worker的receiveWithLogging
中查看

worker接收到master的信息后,启动定时器,定时
val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
向自己发送心跳SendHeartbeat
,此时需要在worker的receiveWithLogging方法中查看SendHeartbeat
,查看代码,又发送heartBeat
给master

master收到心跳后,判断是否存在workerId,如果存在则更新workerInfo的心跳时间,如果不存在,发送信息
ReconnectWorker
,让worker重新向注册。
网友评论