美文网首页
JobManager源码分析

JobManager源码分析

作者: zjlearn | 来源:发表于2016-07-19 15:43 被阅读280次

    JobManager类

    JobManager类用于对提交的任务进行管理。

    初始化

    JobManager初始化的过程主要是从配置文件(server/conf/sqoop.properties)中读取配置信息,
    相关的配置信息包括了:任务的提交引擎,任务的执行引擎,以及对任务的提交信息进行更新的时间间隔(默认的时间 为一天 即源代码中的 DEFAULT_PURGE_THRESHOLD)以及更新时间。
    之后根据配置信息,实例化提交引擎和执行引擎。以及根据时间间隔 启动submission清理的线程。和更新的线程。
    初始化的源代码如下:

    public synchronized void initialize() {
        LOG.trace("Begin submission engine manager initialization");
        MapContext context = SqoopConfiguration.getInstance().getContext();
    
        // Let's load configured submission engine
        String submissionEngineClassName =
          context.getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE);
    
        submissionEngine = (SubmissionEngine) ClassUtils
          .instantiate(submissionEngineClassName);
        if (submissionEngine == null) {
          throw new SqoopException(DriverError.DRIVER_0001,
            submissionEngineClassName);
        }
    
        submissionEngine.initialize(context,
            DriverConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
    
        // Execution engine
        String executionEngineClassName =
          context.getString(DriverConstants.SYSCFG_EXECUTION_ENGINE);
    
        executionEngine = (ExecutionEngine) ClassUtils
          .instantiate(executionEngineClassName);
        if (executionEngine == null) {
          throw new SqoopException(DriverError.DRIVER_0007,
            executionEngineClassName);
        }
    
        // We need to make sure that user has configured compatible combination of
        // submission engine and execution engine
        if (!submissionEngine
          .isExecutionEngineSupported(executionEngine.getClass())) {
          throw new SqoopException(DriverError.DRIVER_0008);
        }
    
        executionEngine.initialize(context,
            DriverConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
    
        // Set up worker threads
        purgeThreshold = context.getLong(
          DriverConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
          DEFAULT_PURGE_THRESHOLD
          );
        purgeSleep = context.getLong(
          DriverConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
          DEFAULT_PURGE_SLEEP
          );
    
        purgeThread = new PurgeThread();
        purgeThread.start();
    
        updateSleep = context.getLong(
          DriverConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
          DEFAULT_UPDATE_SLEEP
          );
    
        updateThread = new UpdateThread();
        updateThread.start();
    
        SqoopConfiguration.getInstance().getProvider()
          .registerListener(new CoreConfigurationListener(this));
    
        LOG.info("Submission manager initialized: OK");
      }
    

    相关文章

      网友评论

          本文标题:JobManager源码分析

          本文链接:https://www.haomeiwen.com/subject/udhhjttx.html