美文网首页代码库
Azkaban 源码分析之作业执行篇

Azkaban 源码分析之作业执行篇

作者: 陌城小川 | 来源:发表于2018-08-24 21:23 被阅读0次

    Executor 执行信息分析

    当服务器通过 ExecutorManager 中 dispatch 方法将服务器的提交信息交给 flow 的时候.

    Executor 这边通过 Azkaban-exec-server/ExecutorServlet 的 doGet 方法来接收到请求信息, 通过handleAjaxExecute 交给 FlowRunnerManager 的 submitFlow来处理.

    public void submitFlow(int execId) throws ExecutorManagerException {
    
        // Load file and submit
        // 先查看 flow 是否已经被运行 。
        if (runningFlows.containsKey(execId)) {
          throw new ExecutorManagerException("Execution " + execId
              + " is already running.");
        }
    
        ExecutableFlow flow = null;
    
        flow = executorLoader.fetchExecutableFlow(execId);
    
        logger.info("get flow : " + flow.getFlowId());
    
        if (flow == null) {
          throw new ExecutorManagerException("Error loading flow with exec "
              + execId);
        }
    
        // Sets up the project files and execution directory.
        // 创建 executor 目录并关联 version_project
        setupFlow(flow);
    
        // Setup flow runner
        FlowWatcher watcher = null;
    
        // 获取 flow 的相关属性
        ExecutionOptions options = flow.getExecutionOptions();
    
    
        // 如果flow 运行之前还有没有结束的队列前面的 flow
        // 则 监控其状态
        if (options.getPipelineExecutionId() != null) {
          Integer pipelineExecId = options.getPipelineExecutionId();
          FlowRunner runner = runningFlows.get(pipelineExecId);
    
          if (runner != null) {
            watcher = new LocalFlowWatcher(runner);
          } else {
            watcher = new RemoteFlowWatcher(pipelineExecId, executorLoader);
          }
        }
    
        // 获取 job  运行的线程数量
        int numJobThreads = numJobThreadPerFlow;
    
        if (options.getFlowParameters().containsKey(FLOW_NUM_JOB_THREADS)) {
          try {
            int numJobs =
                Integer.valueOf(options.getFlowParameters().get(
                    FLOW_NUM_JOB_THREADS));
            if (numJobs > 0 && (numJobs <= numJobThreads || ProjectWhitelist
                    .isProjectWhitelisted(flow.getProjectId(),
                        WhitelistType.NumJobPerFlow))) {
              numJobThreads = numJobs;
            }
          } catch (Exception e) {
            throw new ExecutorManagerException(
                "Failed to set the number of job threads "
                    + options.getFlowParameters().get(FLOW_NUM_JOB_THREADS)
                    + " for flow " + execId, e);
          }
        }
    
        FlowRunner runner =
            new FlowRunner(flow, executorLoader, projectLoader, jobtypeManager);
    
        // 初始化属性
        runner.setFlowWatcher(watcher)
            .setJobLogSettings(jobLogChunkSize, jobLogNumFiles)
            .setValidateProxyUser(validateProxyUser)
            .setNumJobThreads(numJobThreads).addListener(this);
    
        configureFlowLevelMetrics(runner);
    
        // Check again.
        if (runningFlows.containsKey(execId)) {
          throw new ExecutorManagerException("Execution " + execId
              + " is already running.");
        }
    
        // Finally, queue the sucker.
        runningFlows.put(execId, runner);
    
        try {
          // The executorService already has a queue.
          // The submit method below actually returns an instance of FutureTask,
          // which implements interface RunnableFuture, which extends both
          // Runnable and Future interfaces
          // 向线程池中提交一个 flow
          Future<?> future = executorService.submit(runner);
          // keep track of this future
          submittedFlows.put(future, runner.getExecutionId());
          // update the last submitted time.
          this.lastFlowSubmittedDate = System.currentTimeMillis();
        } catch (RejectedExecutionException re) {
          throw new ExecutorManagerException(
              "Azkaban server can't execute any more flows. "
                  + "The number of running flows has reached the system configured limit."
                  + "Please notify Azkaban administrators");
        }
      }
    

    相关文章

      网友评论

        本文标题:Azkaban 源码分析之作业执行篇

        本文链接:https://www.haomeiwen.com/subject/bbgiiftx.html