JobManager类
JobManager类用于对提交的任务进行管理。
初始化
JobManager初始化的过程主要是从配置文件(server/conf/sqoop.properties)中读取配置信息,
相关的配置信息包括了:任务的提交引擎,任务的执行引擎,以及对任务的提交信息进行更新的时间间隔(默认的时间 为一天 即源代码中的 DEFAULT_PURGE_THRESHOLD)以及更新时间。
之后根据配置信息,实例化提交引擎和执行引擎。以及根据时间间隔 启动submission清理的线程。和更新的线程。
初始化的源代码如下:
public synchronized void initialize() {
LOG.trace("Begin submission engine manager initialization");
MapContext context = SqoopConfiguration.getInstance().getContext();
// Let's load configured submission engine
String submissionEngineClassName =
context.getString(DriverConstants.SYSCFG_SUBMISSION_ENGINE);
submissionEngine = (SubmissionEngine) ClassUtils
.instantiate(submissionEngineClassName);
if (submissionEngine == null) {
throw new SqoopException(DriverError.DRIVER_0001,
submissionEngineClassName);
}
submissionEngine.initialize(context,
DriverConstants.PREFIX_SUBMISSION_ENGINE_CONFIG);
// Execution engine
String executionEngineClassName =
context.getString(DriverConstants.SYSCFG_EXECUTION_ENGINE);
executionEngine = (ExecutionEngine) ClassUtils
.instantiate(executionEngineClassName);
if (executionEngine == null) {
throw new SqoopException(DriverError.DRIVER_0007,
executionEngineClassName);
}
// We need to make sure that user has configured compatible combination of
// submission engine and execution engine
if (!submissionEngine
.isExecutionEngineSupported(executionEngine.getClass())) {
throw new SqoopException(DriverError.DRIVER_0008);
}
executionEngine.initialize(context,
DriverConstants.PREFIX_EXECUTION_ENGINE_CONFIG);
// Set up worker threads
purgeThreshold = context.getLong(
DriverConstants.SYSCFG_SUBMISSION_PURGE_THRESHOLD,
DEFAULT_PURGE_THRESHOLD
);
purgeSleep = context.getLong(
DriverConstants.SYSCFG_SUBMISSION_PURGE_SLEEP,
DEFAULT_PURGE_SLEEP
);
purgeThread = new PurgeThread();
purgeThread.start();
updateSleep = context.getLong(
DriverConstants.SYSCFG_SUBMISSION_UPDATE_SLEEP,
DEFAULT_UPDATE_SLEEP
);
updateThread = new UpdateThread();
updateThread.start();
SqoopConfiguration.getInstance().getProvider()
.registerListener(new CoreConfigurationListener(this));
LOG.info("Submission manager initialized: OK");
}
网友评论