由上诉代码可以知道无论任务是否存在 都会有一个Assignment 对象
clipboard29.png
对 Assignment 对象进行 一些设置 框出代码是对 集群信息进行更改 将任务信息更新到zookeep上
clipboard30.png
继续点击
clipboard31.png
path 是zookeeper的路径 然后根据对象类型 更新zookeep信息
接下里 通过 watch 机制会 回调 supervisor 中的main方法
clipboard32.png
main方法 在调用run方法
clipboard33.png
通过 mkSupervisor 获得一个 SuperviosrManger对象 点击进入mkSupervisor 方法
clipboard34.png
点击进入 类 SyncProcessEvent
public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds) {
LOG.debug("Syncing processes, interval seconds:" + TimeUtils.time_delta(lastTime));
lastTime = TimeUtils.current_time_secs();
try { /**
* Step 1: get assigned tasks from localstat Map<port(type Integer), LocalAssignment>
*/
//从本地获取任务信息,请问本地的任务信息从哪里来的,由SyncSupervisorEvent类获取zookeeper的信息写入到本地。 if (localAssignments == null) {
localAssignments = new HashMap<>();
}
LOG.debug("Assigned tasks: " + localAssignments); /**
* Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat Map<workerid [WorkerHeartbeat, state]>
* 获取当前机器上所属的worker的状态信息
*/
Map<String, StateHeartbeat> localWorkerStats;
try {
localWorkerStats = getLocalWorkerStats(conf, localState, localAssignments);
} catch (Exception e) {
LOG.error("Failed to get Local worker stats");
throw e;
}
LOG.debug("Allocated: " + localWorkerStats); /**
* Step 3: kill Invalid Workers and remove killed worker from localWorkerStats
* 杀死一些无效的worker并移除本地状态
*/
Map<String, Integer> taskCleaupTimeoutMap;
Set<Integer> keepPorts = null;
try {
taskCleaupTimeoutMap = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT);
keepPorts = killUselessWorkers(localWorkerStats, localAssignments, taskCleaupTimeoutMap);
localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, taskCleaupTimeoutMap);
} catch (IOException e) {
LOG.error("Failed to kill workers", e);
} // check new workers
checkNewWorkers(conf); // check which topology need update
checkNeedUpdateTopologys(localWorkerStats, localAssignments); // start new workers
//启动新的任务
startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds); } catch (Exception e) {
LOG.error("Failed Sync Process", e);
// throw e
}}
其中startNewWorkers 需要会创建一个新的worker 点击进入
clipboard35.png
点进去 会有这个代码 会把worker 放入队列
clipboard36.png
拼装参数 讲道理 debug的时候 会有参数详情 我这里不知知道为什么没有 先看着吧
clipboard37.png
然后启动worker 这里大家可能想知道 worker这个进程是在哪里启动的下面给大家看看
clipboard38.png
点击进入方法
clipboard39.png
继续点击
clipboard40.png
继续点击呗
clipboard41.png
非后台 启动一个进程 点击进入
clipboard42.png
皆大欢喜 进程启动方法
网友评论