美文网首页
elastic-job源码浅析(任务初始化)

elastic-job源码浅析(任务初始化)

作者: 飞盏 | 来源:发表于2018-04-25 11:17 被阅读0次

1. 作业过程源码分析

作业初始化

5.1 核心入口:JobScheduler作业调度器

/**
 * 作业调度器.
 * 
 * @author zhangliang
 * @author caohao
 */
public class JobScheduler {
    
    private static final String SCHEDULER_INSTANCE_NAME_SUFFIX = "Scheduler";
    
    private static final String CRON_TRIGGER_IDENTITY_SUFFIX = "Trigger";

    //作业启动器
    private final JobExecutor jobExecutor;
    
    public JobScheduler(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
        jobExecutor = new JobExecutor(regCenter, jobConfig, elasticJobListeners);
    }
    
    /**
     * 初始化作业.
     */
    public void init() {
         //作业启动器初始化
        jobExecutor.init();
        //建造者模式构造jobDetail
        JobDetail jobDetail = JobBuilder.newJob(LiteJob.class).withIdentity(jobExecutor.getJobName()).build();
        //保留job的状态信息
        jobDetail.getJobDataMap().put("elasticJob", jobExecutor.getElasticJob());
        JobScheduleController jobScheduleController;
        try {
            //实例化作业调度控制器
            jobScheduleController = new JobScheduleController(
                    initializeScheduler(jobDetail.getKey().toString()), jobDetail, jobExecutor.getSchedulerFacade(), Joiner.on("_").join(jobExecutor.getJobName(), CRON_TRIGGER_IDENTITY_SUFFIX));
            jobScheduleController.scheduleJob(jobExecutor.getSchedulerFacade().getCron());
        } catch (final SchedulerException ex) {
            throw new JobException(ex);
        }
        //向作业注册表注册JobScheduleController实例
        JobRegistry.getInstance().addJobScheduleController(jobExecutor.getJobName(), jobScheduleController);
    }
    
    private Scheduler initializeScheduler(final String jobName) throws SchedulerException {
        //工厂方法构造quartz的Scheduler实例
        StdSchedulerFactory factory = new StdSchedulerFactory();
        factory.initialize(getBaseQuartzProperties(jobName));
        Scheduler result = factory.getScheduler();
        //注册Trigger监听事件
         result.getListenerManager().addTriggerListener(jobExecutor.getSchedulerFacade().newJobTriggerListener());
        return result;
    }

     private Properties getBaseQuartzProperties(final String jobName) {
        Properties result = new Properties();
        result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
        //并发执行线程数为1,意味着job任务同步执行,防止同一个任务执行时间过长被重复执行
        result.put("org.quartz.threadPool.threadCount", "1");
        result.put("org.quartz.scheduler.instanceName", Joiner.on("_").join(jobName, SCHEDULER_INSTANCE_NAME_SUFFIX));
            if (!jobExecutor.getSchedulerFacade().isMisfire()) {
                result.put("org.quartz.jobStore.misfireThreshold", "1");
            }
            prepareEnvironments(result);
            return result;
            }

        //钩子方法,用于子类覆盖
        protected void prepareEnvironments(final Properties props) {
        }
  }

5.2 JobExecutor作业启动器

/**
 * 作业启动器.
 * 
 * @author zhangliang
 */
@Slf4j
@Getter
public class JobExecutor {
    
    private final String jobName;
    //分布式注册中心
    private final CoordinatorRegistryCenter regCenter;
    //作业具体执行器
    private final ElasticJob elasticJob;
    //为调度器提供内部服务的门面类
    private final SchedulerFacade schedulerFacade;

 /**
     * 初始化作业.
     */
    public void init() {
        log.debug("Elastic job: job controller init, job name is: {}.", jobName);
        //清楚上次作业状态信息
        schedulerFacade.clearPreviousServerStatus();
        //向注册中心注册当前任务
        regCenter.addCacheData("/" + jobName);
        //注册Elastic-Job启动信息
        schedulerFacade.registerStartUpInfo();
    }
    
    public JobExecutor(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final ElasticJobListener... elasticJobListeners) {
        jobName = jobConfig.getJobName();
        this.regCenter = regCenter;
        List<ElasticJobListener> elasticJobListenerList = Arrays.asList(elasticJobListeners);
        setGuaranteeServiceForElasticJobListeners(regCenter, jobConfig, elasticJobListenerList);
        elasticJob = createElasticJob(jobConfig, elasticJobListenerList);
        schedulerFacade = new SchedulerFacade(regCenter, jobConfig, elasticJobListenerList);
    }
    
    private void setGuaranteeServiceForElasticJobListeners(final CoordinatorRegistryCenter regCenter, final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListeners) {
        GuaranteeService guaranteeService = new GuaranteeService(regCenter, jobConfig);
        for (ElasticJobListener each : elasticJobListeners) {
            if (each instanceof AbstractDistributeOnceElasticJobListener) {
                ((AbstractDistributeOnceElasticJobListener) each).setGuaranteeService(guaranteeService);
            }
        }
    }
    
    private ElasticJob createElasticJob(final JobConfiguration jobConfig, final List<ElasticJobListener> elasticJobListenerList) {
        ElasticJob result;
        try {
            result = (ElasticJob) jobConfig.getJobClass().newInstance();
        } catch (final InstantiationException | IllegalAccessException ex) {
            throw new JobException(ex);
        }
        result.setJobFacade(new JobFacade(regCenter, jobConfig, elasticJobListenerList));
        return result;
    }
}

5.3 JobFacade门面类

/**
 * 为调度器提供内部服务的门面类.
 * 
 * @author zhangliang
 */
public class SchedulerFacade {
    
    private final ConfigurationService configService;
    
    private final LeaderElectionService leaderElectionService;
    
    private final ServerService serverService;
    
    private final ShardingService shardingService;
    
    private final ExecutionService executionService;
    
    private final StatisticsService statisticsService;
    
    private final MonitorService monitorService;
    
    private final ListenerManager listenerManager;
    
    public SchedulerFacade(final CoordinatorRegistryCenter coordinatorRegistryCenter, final JobConfiguration jobConfiguration, final List<ElasticJobListener> elasticJobListeners) {
        configService = new ConfigurationService(coordinatorRegistryCenter, jobConfiguration);
        leaderElectionService = new LeaderElectionService(coordinatorRegistryCenter, jobConfiguration);
        serverService = new ServerService(coordinatorRegistryCenter, jobConfiguration);
        shardingService = new ShardingService(coordinatorRegistryCenter, jobConfiguration);
        executionService = new ExecutionService(coordinatorRegistryCenter, jobConfiguration);
        statisticsService = new StatisticsService(coordinatorRegistryCenter, jobConfiguration);
        monitorService = new MonitorService(coordinatorRegistryCenter, jobConfiguration);
        listenerManager = new ListenerManager(coordinatorRegistryCenter, jobConfiguration, elasticJobListeners);
    }
    
    /**
     * 每次作业启动前清理上次运行状态.
     */
    public void clearPreviousServerStatus() {
        serverService.clearPreviousServerStatus();
    }
    
    /**
     * 注册Elastic-Job启动信息.
     */
    public void registerStartUpInfo() {
        //启动各类监听器
        listenerManager.startAllListeners();
        //强制选举主节点
        leaderElectionService.leaderForceElection();
        //zk中持久化分布式作业配置信息
        configService.persistJobConfiguration();
         //zk中持久化作业服务器上线相关信息.
        serverService.persistServerOnline();
        //清除暂停作业的标记.
        serverService.clearJobPausedStatus();
        if (JobType.DATA_FLOW == configService.getJobType()) {
                //异步开启统计处理数据数量的作业            
                statisticsService.startProcessCountJob();
        }
        //设置需要重新分片的标记
        shardingService.setReshardingFlag();
        //初始化作业监控服务
        monitorService.listen();
    }
    
    /**
     * 释放作业占用的资源.
     */
    public void releaseJobResource() {
        monitorService.close();
        if (JobType.DATA_FLOW.equals(configService.getJobType())) {
            statisticsService.stopProcessCountJob();
        }
        serverService.removeServerStatus();
    }
    
    /**
     * 获取作业启动时间的cron表达式.
     *
     * @return 作业启动时间的cron表达式
     */
    public String getCron() {
        return configService.getCron();
    }
    
    /**
     * 获取是否开启misfire.
     *
     * @return 是否开启misfire
     */
    public boolean isMisfire() {
        return configService.isMisfire();
    }
    
    /**
     * 获取作业触发监听器.
     * 
     * @return 作业触发监听器
     */
    public JobTriggerListener newJobTriggerListener() {
        return new JobTriggerListener(executionService, shardingService);
    }
}

5.4 作业注册表

/**
 * 作业注册表.
 * 
 * @author zhangliang
 * @author caohao
 */
//多线程双检锁:保证单例线程安全
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class JobRegistry {
    //为什么要用volatile关键字?
    private static volatile JobRegistry instance;
    //全局的作业被以map形式缓存在注册表单例中
    private Map<String, JobScheduleController> schedulerMap = new ConcurrentHashMap<>();
    
    /**
     * 获取作业注册表实例.
     * 
     * @return 作业注册表实例
     */
    public static JobRegistry getInstance() {
        if (null == instance) {
            synchronized (JobRegistry.class) {
                if (null == instance) {
                    //实际上实例化分为分配内存和执行构造方法两部分,如果不加volatile,会导致指令重排序,导致构造方法先被执行。
                    //而另一个线程到达临界区代码段,从而获取到一个未被完全实例化的instance。
                    instance = new JobRegistry();
                }
            }
        }
        return instance;
    }
    
    /**
     * 添加作业调度控制器.
     * 
     * @param jobName 作业名称
     * @param jobScheduleController 作业调度控制器
     */
    public void addJobScheduleController(final String jobName, final JobScheduleController jobScheduleController) {
        schedulerMap.put(jobName, jobScheduleController);
    }
    
    /**
     * 获取作业调度控制器.
     * 
     * @param jobName 作业名称
     * @return 作业调度控制器
     */
    public JobScheduleController getJobScheduleController(final String jobName) {
        return schedulerMap.get(jobName);
    }
}

相关文章

网友评论

      本文标题:elastic-job源码浅析(任务初始化)

      本文链接:https://www.haomeiwen.com/subject/kbyelftx.html