1. 作业过程源码分析
作业初始化
5.1 核心入口:JobScheduler作业调度器
/**
* 作业调度器.
*
* @author zhangliang
* @author caohao
*/
public class JobScheduler {
private static final String SCHEDULER_INSTANCE_NAME_SUFFIX = "Scheduler";
private static final String CRON_TRIGGER_IDENTITY_SUFFIX = "Trigger";
//作业启动器
private final JobExecutor jobExecutor;
public JobScheduler(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
jobExecutor = new JobExecutor(regCenter, jobConfig, elasticJobListeners);
}
/**
* 初始化作业.
*/
public void init() {
//作业启动器初始化
jobExecutor.init();
//建造者模式构造jobDetail
JobDetail jobDetail = JobBuilder.newJob(LiteJob.class).withIdentity(jobExecutor.getJobName()).build();
//保留job的状态信息
jobDetail.getJobDataMap().put("elasticJob", jobExecutor.getElasticJob());
JobScheduleController jobScheduleController;
try {
//实例化作业调度控制器
jobScheduleController = new JobScheduleController(
initializeScheduler(jobDetail.getKey().toString()), jobDetail, jobExecutor.getSchedulerFacade(), Joiner.on("_").join(jobExecutor.getJobName(), CRON_TRIGGER_IDENTITY_SUFFIX));
jobScheduleController.scheduleJob(jobExecutor.getSchedulerFacade().getCron());
} catch (final SchedulerException ex) {
throw new JobException(ex);
}
//向作业注册表注册JobScheduleController实例
JobRegistry.getInstance().addJobScheduleController(jobExecutor.getJobName(), jobScheduleController);
}
private Scheduler initializeScheduler(final String jobName) throws SchedulerException {
//工厂方法构造quartz的Scheduler实例
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties(jobName));
Scheduler result = factory.getScheduler();
//注册Trigger监听事件
result.getListenerManager().addTriggerListener(jobExecutor.getSchedulerFacade().newJobTriggerListener());
return result;
}
private Properties getBaseQuartzProperties(final String jobName) {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
//并发执行线程数为1,意味着job任务同步执行,防止同一个任务执行时间过长被重复执行
result.put("org.quartz.threadPool.threadCount", "1");
result.put("org.quartz.scheduler.instanceName", Joiner.on("_").join(jobName, SCHEDULER_INSTANCE_NAME_SUFFIX));
if (!jobExecutor.getSchedulerFacade().isMisfire()) {
result.put("org.quartz.jobStore.misfireThreshold", "1");
}
prepareEnvironments(result);
return result;
}
//钩子方法,用于子类覆盖
protected void prepareEnvironments(final Properties props) {
}
}
5.2 JobExecutor作业启动器
/**
* 作业启动器.
*
* @author zhangliang
*/
@Slf4j
@Getter
public class JobExecutor {
private final String jobName;
//分布式注册中心
private final CoordinatorRegistryCenter regCenter;
//作业具体执行器
private final ElasticJob elasticJob;
//为调度器提供内部服务的门面类
private final SchedulerFacade schedulerFacade;
/**
* 初始化作业.
*/
public void init() {
log.debug("Elastic job: job controller init, job name is: {}.", jobName);
//清楚上次作业状态信息
schedulerFacade.clearPreviousServerStatus();
//向注册中心注册当前任务
regCenter.addCacheData("/" + jobName);
//注册Elastic-Job启动信息
schedulerFacade.registerStartUpInfo();
}
public JobExecutor(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
jobName = jobConfig.getJobName();
this.regCenter = regCenter;
List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
setGuaranteeServiceForElasticJobListeners(regCenter, jobConfig, elasticJobListenerList);
elasticJob = createElasticJob(jobConfig, elasticJobListenerList);
schedulerFacade = new SchedulerFacade(regCenter, jobConfig, elasticJobListenerList);
}
private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListeners) {
GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobConfig);
for (ElasticJobListener each : elasticJobListeners) {
if (each instanceof AbstractDistributeOnceElasticJobListener) {
((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
}
}
}
private ElasticJob createElasticJob(final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListenerList) {
ElasticJob result;
try {
result = (ElasticJob) jobConfig.getJobClass().newInstance();
} catch (final InstantiationException | IllegalAccessException ex) {
throw new JobException(ex);
}
result.setJobFacade(new JobFacade(regCenter, jobConfig, elasticJobListenerList));
return result;
}
}
5.3 JobFacade门面类
/**
* 为调度器提供内部服务的门面类.
*
* @author zhangliang
*/
public class SchedulerFacade {
private final ConfigurationService configService;
private final LeaderElectionService leaderElectionService;
private final ServerService serverService;
private final ShardingService shardingService;
private final ExecutionService executionService;
private final StatisticsService statisticsService;
private final MonitorService monitorService;
private final ListenerManager listenerManager;
public SchedulerFacade(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration, final List<ElasticJobListener> elasticJobListeners) {
configService = new ConfigurationService(coordinatorRegistryCenter, jobConfiguration);
leaderElectionService = new LeaderElectionService(coordinatorRegistryCenter, jobConfiguration);
serverService = new ServerService(coordinatorRegistryCenter, jobConfiguration);
shardingService = new ShardingService(coordinatorRegistryCenter, jobConfiguration);
executionService = new ExecutionService(coordinatorRegistryCenter, jobConfiguration);
statisticsService = new StatisticsService(coordinatorRegistryCenter, jobConfiguration);
monitorService = new MonitorService(coordinatorRegistryCenter, jobConfiguration);
listenerManager = new ListenerManager(coordinatorRegistryCenter, jobConfiguration, elasticJobListeners);
}
/**
* 每次作业启动前清理上次运行状态.
*/
public void clearPreviousServerStatus() {
serverService.clearPreviousServerStatus();
}
/**
* 注册Elastic-Job启动信息.
*/
public void registerStartUpInfo() {
//启动各类监听器
listenerManager.startAllListeners();
//强制选举主节点
leaderElectionService.leaderForceElection();
//zk中持久化分布式作业配置信息
configService.persistJobConfiguration();
//zk中持久化作业服务器上线相关信息.
serverService.persistServerOnline();
//清除暂停作业的标记.
serverService.clearJobPausedStatus();
if (JobType.DATA_FLOW == configService.getJobType()) {
//异步开启统计处理数据数量的作业
statisticsService.startProcessCountJob();
}
//设置需要重新分片的标记
shardingService.setReshardingFlag();
//初始化作业监控服务
monitorService.listen();
}
/**
* 释放作业占用的资源.
*/
public void releaseJobResource() {
monitorService.close();
if (JobType.DATA_FLOW.equals(configService.getJobType())) {
statisticsService.stopProcessCountJob();
}
serverService.removeServerStatus();
}
/**
* 获取作业启动时间的cron表达式.
*
* @return 作业启动时间的cron表达式
*/
public String getCron() {
return configService.getCron();
}
/**
* 获取是否开启misfire.
*
* @return 是否开启misfire
*/
public boolean isMisfire() {
return configService.isMisfire();
}
/**
* 获取作业触发监听器.
*
* @return 作业触发监听器
*/
public JobTriggerListener newJobTriggerListener() {
return new JobTriggerListener(executionService, shardingService);
}
}
5.4 作业注册表
/**
* 作业注册表.
*
* @author zhangliang
* @author caohao
*/
//多线程双检锁:保证单例线程安全
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobRegistry {
//为什么要用volatile关键字?
private static volatile JobRegistry instance;
//全局的作业被以map形式缓存在注册表单例中
private Map<String, JobScheduleController> schedulerMap = new ConcurrentHashMap<>();
/**
* 获取作业注册表实例.
*
* @return 作业注册表实例
*/
public static JobRegistry getInstance() {
if (null == instance) {
synchronized (JobRegistry.class) {
if (null == instance) {
//实际上实例化分为分配内存和执行构造方法两部分,如果不加volatile,会导致指令重排序,导致构造方法先被执行。
//而另一个线程到达临界区代码段,从而获取到一个未被完全实例化的instance。
instance = new JobRegistry();
}
}
}
return instance;
}
/**
* 添加作业调度控制器.
*
* @param jobName 作业名称
* @param jobScheduleController 作业调度控制器
*/
public void addJobScheduleController(final String jobName, final JobScheduleController jobScheduleController) {
schedulerMap.put(jobName, jobScheduleController);
}
/**
* 获取作业调度控制器.
*
* @param jobName 作业名称
* @return 作业调度控制器
*/
public JobScheduleController getJobScheduleController(final String jobName) {
return schedulerMap.get(jobName);
}
}
网友评论