上一篇Elastic-Job源码分析-作业初始化过程.md分析了作业初始化的过程,今天来分析下调度作业的执行过程,首先我们结合作业初始化过程中的createJobDetail
来找到调度执行的入口,然后依次分析调度作业执行前、中和后都做了哪些操作。
一 调度执行入口
之前文章我们说过elastic-job是建立在quartz调度框架的基础上进行二次开发的,quartz在创建调度时是用JobDetailImpl中的jobClass成员变量来指定调度触发时真正的作业执行类, 这个作业类实现了Job
接口, 例如下面代码段中的LiteJob.class
,
/**
* 作业调度器.
*/
public class JobScheduler {
/**
* 创建jobDetail.
* 将jobFacade和我们最开始新建的MyElasticJob类的实例放入JobDataMap中.
*
*/
private JobDetail createJobDetail(final String jobClass) {
JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
/**
* 将LiteJob的成员变量jobFacade, 对象放入JobDetail的JobDataMap中,
* quartz会在作业触发时使用jobDataMap中的元素来初始化LiteJob的成员变量,
* 后面会给出初始化的源码片段。
*/
result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
if (elasticJobInstance.isPresent()) {
// 和上面的一样,将LiteJob的成员变量elasticJob, 对象放入JobDetail的JobDataMap中
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
} else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) {
try {
result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
} catch (final ReflectiveOperationException ex) {
throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
}
}
return result;
}
}
/**
* Lite调度作业.
*/
public final class LiteJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
/**
* 根据elasticJob的实际类型,
* 通过JobExecutorFactory.getJobExecutor获取对应的JobExecutor
*
*/
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
根据Job的execute方法定位到调用处
public class JobRunShell extends SchedulerListenerSupport implements Runnable {
...
/**
* run()之前的初始化方法
*/
public void initialize(QuartzScheduler sched)
throws SchedulerException {
this.qs = sched;
Job job = null;
JobDetail jobDetail = firedTriggerBundle.getJobDetail();
try {
/**
* sched.getJobFactory()获取到PropertySettingJobFactory类对象,
* 调用其中的newJob(TriggerFiredBundle bundle, Scheduler scheduler)方法
*/
job = sched.getJobFactory().newJob(firedTriggerBundle, scheduler);
}
...
}
public void run() {
qs.addInternalSchedulerListener(this);
try {
OperableTrigger trigger = (OperableTrigger) jec.getTrigger();
JobDetail jobDetail = jec.getJobDetail();
do {
JobExecutionException jobExEx = null;
Job job = jec.getJobInstance();
...
try {
log.debug("Calling execute on job " + jobDetail.getKey());
// 执行LiteJob的execute(final JobExecutionContext context)方法
job.execute(jec);
endTime = System.currentTimeMillis();
} ...
} while (true);
} finally {
qs.removeInternalSchedulerListener(this);
}
}
...
}
public class PropertySettingJobFactory extends SimpleJobFactory {
...
@Override
public Job newJob(TriggerFiredBundle bundle, Scheduler scheduler) throws SchedulerException {
Job job = super.newJob(bundle, scheduler);
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.putAll(scheduler.getContext());
jobDataMap.putAll(bundle.getJobDetail().getJobDataMap());
jobDataMap.putAll(bundle.getTrigger().getJobDataMap());
/**
* 这个方法的方法体较长就不展示了,主要就是用jobDataMap中的元素来初始化
* Job中的成员变量,例如上面的LiteJob.class,其中有两个成员变量,
* elasticJob和jobFacade就是通过该方式来初始化的。
*/
setBeanProps(job, jobDataMap);
return job;
}
}
public class SimpleJobFactory implements JobFactory {
...
/**
* 根据JobDetail中的JobClass类来生成一个实例。
*/
public Job newJob(TriggerFiredBundle bundle, Scheduler Scheduler) throws SchedulerException {
JobDetail jobDetail = bundle.getJobDetail();
Class<? extends Job> jobClass = jobDetail.getJobClass();
try {
if(log.isDebugEnabled()) {
log.debug(
"Producing instance of Job '" + jobDetail.getKey() +
"', class=" + jobClass.getName());
}
return jobClass.newInstance();
} catch (Exception e) {
SchedulerException se = new SchedulerException(
"Problem instantiating class '"
+ jobDetail.getJobClass().getName() + "'", e);
throw se;
}
}
}
至此,整个JobLite的初始化过程结束,以上过程主要是交代下LiteJob中的两个成员变量是如何被初始化的,同时可将LiteJob的execute(final JobExecutionContext context)
作为elastic-job作业执行的入口开始今天的主题。
二 获取作业执行器SimpleJobExecutor
/**
* Lite调度作业.
*/
public final class LiteJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
/**
* 根据elasticJob的实际类型,
* 通过JobExecutorFactory.getJobExecutor获取对应的JobExecutor
*
* @param context
* @throws JobExecutionException
*/
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
/**
* 根据elasticJob的类型来获取不同的AbstractElasticJobExecutor实例,
* 然后执行AbstractElasticJobExecutor的execute()方法。
*/
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
}
}
public final class JobExecutorFactory {
/**
* 获取作业执行器.
*
* @param elasticJob 分布式弹性作业
* @param jobFacade 作业内部服务门面服务
* @return 作业执行器
*/
@SuppressWarnings("unchecked")
public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) {
if (null == elasticJob) {
return new ScriptJobExecutor(jobFacade);
}
/**
* 我们的Demo中创建的MyElasticJob实现的是SimpleJob接口,
* 所以此处我们return的是个SimpleJobExecutor类
*/
if (elasticJob instanceof SimpleJob) {
return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
}
if (elasticJob instanceof DataflowJob) {
return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
}
throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
}
}
/**
* 简单作业执行器.
*/
public final class SimpleJobExecutor extends AbstractElasticJobExecutor {
private final SimpleJob simpleJob;
public SimpleJobExecutor(final SimpleJob simpleJob, final JobFacade jobFacade) {
super(jobFacade);
this.simpleJob = simpleJob;
}
@Override
protected void process(final ShardingContext shardingContext) {
simpleJob.execute(shardingContext);
}
}
LiteJob中根据elasticJob的实际类型获取相应的AbstractElasticJobExecutor子类,然后执行AbstractElasticJobExecutor的execute()
方法, 接下来将重点分析AbstractElasticJobExecutor类。
三 ElasticJob执行器抽象类AbstractElasticJobExecutor
/**
* 弹性化分布式作业执行器.
*/
@Slf4j
public abstract class AbstractElasticJobExecutor {
...
/**
* 执行作业.
*/
public final void execute() {
try {
// 检查本机与注册中心的时间误差秒数是否在允许范围.
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
jobExceptionHandler.handleException(jobName, cause);
}
// 获取分片上下文ShardingContexts
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
if (shardingContexts.isAllowSendJobEvent()) {
// 发布作业启动事件
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobName));
}
// 如果当前调度还在运行中而下一个调度周期已经到来了,则将该分片任务标记为错过执行.
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
if (shardingContexts.isAllowSendJobEvent()) {
// 发布作业启动事件
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", jobName,
shardingContexts.getShardingItemParameters().keySet()));
}
return;
}
try {
// 执行弹性化分布式作业监听器接口的beforeJobExecuted方法.
jobFacade.beforeJobExecuted(shardingContexts);
} catch (final Throwable cause) {
jobExceptionHandler.handleException(jobName, cause);
}
//执行作业
execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);
/**
* 如果执行该作业错过的任务时,该分片再次被标记为错过执行任务,
* 则继续执行错过的任务,直到不再有分片被标记为错过为止
*/
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
// 清除分片任务被错过执行的标记.
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(shardingContexts, JobExecutionEvent.ExecutionSource.MISFIRE);
}
// 如果需要失效转移, 则执行作业失效转移.
jobFacade.failoverIfNecessary();
try {
// 执行弹性化分布式作业监听器接口的afterJobExecuted方法.
jobFacade.afterJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobExceptionHandler.handleException(jobName, cause);
}
}
/**
* @param shardingContexts
* @param executionSource
*/
private void execute(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
if (shardingContexts.getShardingItemParameters().isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobName));
}
return;
}
// 注册作业启动信息, 标记当前分片项为running状态
jobFacade.registerJobBegin(shardingContexts);
String taskId = shardingContexts.getTaskId();
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
}
try {
process(shardingContexts, executionSource);
} finally {
// 注册作业完成信息, 移除running状态标志节点
jobFacade.registerJobCompleted(shardingContexts);
if (itemErrorMessages.isEmpty()) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
}
} else {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
}
}
}
}
private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
process(shardingContexts, item, jobExecutionEvent);
return;
}
// 使用CountDownLatch实现等待多个分片任务全都执行完后再执行后面动作的效果
final CountDownLatch latch = new CountDownLatch(items.size());
for (final int each : items) {
final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
if (executorService.isShutdown()) {
return;
}
executorService.submit(new Runnable() {
@Override
public void run() {
try {
process(shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
private void process(final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(startEvent);
}
log.trace("Job '{}' executing, item is: '{}'.", jobName, item);
JobExecutionEvent completeEvent;
try {
// 实际要执行的业务逻辑
process(new ShardingContext(shardingContexts, item));
completeEvent = startEvent.executionSuccess();
log.trace("Job '{}' executed, item is: '{}'.", jobName, item);
if (shardingContexts.isAllowSendJobEvent()) {
jobFacade.postJobExecutionEvent(completeEvent);
}
} catch (final Throwable cause) {
completeEvent = startEvent.executionFailure(cause);
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtil.transform(cause));
jobExceptionHandler.handleException(jobName, cause);
}
}
/**
* 实际要执行的业务逻辑
* @param shardingContext
*/
protected abstract void process(ShardingContext shardingContext);
}
AbstractElasticJobExecutor
抽象类控制作业执行前、后的一些动作,例如事件发布、分片运行状态管理、错过重执行和和失效转移等动作。
总结
本文从createJobDetail
入手,分析了JobLiteJob
的两个成员变量 elasticJob
和 jobFacade
是如果被传入和初始化的;然后以SimpleJob为例,分析如何获取执行器SimpleJobExecutor
;最后重点分析了作业执行过程中都做了哪些事情,通过源码分析了解到,Elastic-job何时控制错误重执行、何时做失效转移动作以及如何控制多个分片的并行执行等。
网友评论