美文网首页
storm源码工作流程(三)Supervisor监听zookee

storm源码工作流程(三)Supervisor监听zookee

作者: 小熊先生很不开心 | 来源:发表于2018-07-08 19:10 被阅读141次
    clipboard27.png

    由上诉代码可以知道无论任务是否存在 都会有一个Assignment 对象


    clipboard29.png

    对 Assignment 对象进行 一些设置 框出代码是对 集群信息进行更改 将任务信息更新到zookeep上


    clipboard30.png
    继续点击
    clipboard31.png
    path 是zookeeper的路径 然后根据对象类型 更新zookeep信息

    接下里 通过 watch 机制会 回调 supervisor 中的main方法


    clipboard32.png
    main方法 在调用run方法
    clipboard33.png
    通过 mkSupervisor 获得一个 SuperviosrManger对象 点击进入mkSupervisor 方法
    clipboard34.png
    点击进入 类 SyncProcessEvent
    public void run(Map<Integer, LocalAssignment> localAssignments, Set<String> downloadFailedTopologyIds) {
        LOG.debug("Syncing processes, interval seconds:" + TimeUtils.time_delta(lastTime));
        lastTime = TimeUtils.current_time_secs();
        try { /**
             * Step 1: get assigned tasks from localstat Map<port(type Integer), LocalAssignment>
             */
            //从本地获取任务信息,请问本地的任务信息从哪里来的,由SyncSupervisorEvent类获取zookeeper的信息写入到本地。 if (localAssignments == null) {
                localAssignments = new HashMap<>();
            }
            LOG.debug("Assigned tasks: " + localAssignments); /**
             * Step 2: get local WorkerStats from local_dir/worker/ids/heartbeat Map<workerid [WorkerHeartbeat, state]>
             * 获取当前机器上所属的worker的状态信息
             */
            Map<String, StateHeartbeat> localWorkerStats;
            try {
                localWorkerStats = getLocalWorkerStats(conf, localState, localAssignments);
            } catch (Exception e) {
                LOG.error("Failed to get Local worker stats");
                throw e;
            }
            LOG.debug("Allocated: " + localWorkerStats); /**
             * Step 3: kill Invalid Workers and remove killed worker from localWorkerStats
             * 杀死一些无效的worker并移除本地状态
             */
            Map<String, Integer> taskCleaupTimeoutMap;
            Set<Integer> keepPorts = null;
            try {
                taskCleaupTimeoutMap = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT);
                keepPorts = killUselessWorkers(localWorkerStats, localAssignments, taskCleaupTimeoutMap);
                localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, taskCleaupTimeoutMap);
            } catch (IOException e) {
                LOG.error("Failed to kill workers", e);
            } // check new workers
            checkNewWorkers(conf); // check which topology need update
            checkNeedUpdateTopologys(localWorkerStats, localAssignments); // start new workers
            //启动新的任务
            startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds); } catch (Exception e) {
            LOG.error("Failed Sync Process", e);
            // throw e
        }}
    

    其中startNewWorkers 需要会创建一个新的worker 点击进入


    clipboard35.png

    点进去 会有这个代码 会把worker 放入队列


    clipboard36.png
    拼装参数 讲道理 debug的时候 会有参数详情 我这里不知知道为什么没有 先看着吧
    clipboard37.png

    然后启动worker 这里大家可能想知道 worker这个进程是在哪里启动的下面给大家看看


    clipboard38.png
    点击进入方法
    clipboard39.png
    继续点击
    clipboard40.png
    继续点击呗
    clipboard41.png
    非后台 启动一个进程 点击进入
    clipboard42.png

    皆大欢喜 进程启动方法

    相关文章

      网友评论

          本文标题:storm源码工作流程(三)Supervisor监听zookee

          本文链接:https://www.haomeiwen.com/subject/iyoouftx.html