美文网首页
elastic-job源码浅析(任务执行过程)

elastic-job源码浅析(任务执行过程)

作者: 飞盏 | 来源:发表于2018-04-25 14:42 被阅读0次

    1.启动过程流程图

    启动过程

    流程图详细地描述了各个作业细节的执行过程,看上去流程非常复杂,其主要的功能点为:判断作业是否可执行,判断作业是否分片执行,作业执行状态监听,作业失效转移等。下面我们结合代码一步步窥探他的执行过程。

    2.核心源码分析

    2.1 作业入口

    /**
     * Elastic Job Lite提供的Quartz封装作业.
     *
     * @author zhangliang
     */
    public class LiteJob implements Job {
        
        @Setter
        private ElasticJob elasticJob;
        
        @Override
        public void execute(final JobExecutionContext context) throws JobExecutionException {
            elasticJob.execute();
        }
    }
    

    LiteJob实现了Quartz的Job接口,并且持有elasticJob的实现类,通过代理的方式实现了ElasticJob与Quartz的无缝衔接;

    【亮点】这是一种典型的代理模式,其好处在于体验上完全与Quartz的Job一致,并且遵循了代码的开闭原则,使得代码具有很好地拓展性:例如ElasticJob接口有SimpleJob,DataFlowJob或者用户自定义的多种实现类,因此具有很好地拓展性。

    2.2 AbstractElasticJob抽象类及其原理

    /**
     * 弹性化分布式作业的基类.
     * 
     * @author zhangliang
     * @author caohao
     */
    @Slf4j
    public abstract class AbstractElasticJob implements ElasticJob {
        
        //具体的业务实现放在jobFacade门面类中实现,简化代码复杂度
        private JobFacade jobFacade;
        
        @Override
        public final void execute() {
            log.trace("Elastic job: job execute begin.");
            //判断与注册中心时间差是否在允许范围内
            jobFacade.checkMaxTimeDiffSecondsTolerable();
            //获取分片上下文
            JobExecutionMultipleShardingContext shardingContext = jobFacade.getShardingContext();
            //若前面的任务仍在执行,则设置错过执行标记,延迟执行
            if (jobFacade.misfireIfNecessary(shardingContext.getShardingItems())) {
                log.debug("Elastic job: previous job is still running, new job will start after previous job completed. Misfired job had recorded.");
                return;
            }
            //清除作业上次执行的信息
            jobFacade.cleanPreviousExecutionInfo();
            try {
                //各监听器执行job执行前方法
                jobFacade.beforeJobExecuted(shardingContext);
                //CHECKSTYLE:OFF
            } catch (final Throwable cause) {
                //CHECKSTYLE:ON
                handleJobExecutionException(new JobException(cause));
            }
            //执行具体的job业务逻辑
            executeJobInternal(shardingContext);
            log.trace("Elastic job: execute normal completed, sharding context:{}.", shardingContext);
            while (jobFacade.isExecuteMisfired(shardingContext.getShardingItems())) {
                log.trace("Elastic job: execute misfired job, sharding context:{}.", shardingContext);
                jobFacade.clearMisfire(shardingContext.getShardingItems());
                executeJobInternal(shardingContext);
                log.trace("Elastic job: misfired job completed, sharding context:{}.", shardingContext);
            }
            //按需失效转移
            jobFacade.failoverIfNecessary();
            try {
                //执行监听后事件
                jobFacade.afterJobExecuted(shardingContext);
                //CHECKSTYLE:OFF
            } catch (final Throwable cause) {
                //CHECKSTYLE:ON
                handleJobExecutionException(new JobException(cause));
            }
            log.trace("Elastic job: execute all completed.");
        }
        
        private void executeJobInternal(final JobExecutionMultipleShardingContext shardingContext) {
            if (shardingContext.getShardingItems().isEmpty()) {
                log.trace("Elastic job: sharding item is empty, job execution context:{}.", shardingContext);
                return;
            }
            //注册任务执行信息
            jobFacade.registerJobBegin(shardingContext);
            try {
                executeJob(shardingContext);
            //CHECKSTYLE:OFF
            } catch (final Throwable cause) {
            //CHECKSTYLE:ON
                handleJobExecutionException(new JobException(cause));
            } finally {
                // TODO 考虑增加作业失败的状态,并且考虑如何处理作业失败的整体回路
                jobFacade.registerJobCompleted(shardingContext);
            }
        }
        
        protected abstract void executeJob(final JobExecutionMultipleShardingContext shardingContext);
        
        @Override
        public void handleJobExecutionException(final JobException jobException) {
            log.error("Elastic job: exception occur in job processing...", jobException.getCause());
        }
        
        @Override
        public final JobFacade getJobFacade() {
            return jobFacade;
        }
        
        @Override
        public final void setJobFacade(final JobFacade jobFacade) {
            this.jobFacade = jobFacade;
        }
    

    【亮点】外观模式传送门
    上面的代码中应用到了外观模式(Facade),AbstractElasticJob持有jobFacade对象,Elasticjob负责统筹整体的job执行流程但无需关注业务的具体实现,转而将复杂的业务处理逻辑交由jobFacade中的方法进行处理,从而将job与具体的业务逻辑抽离出来方便阅读和拓展。

    相关文章

      网友评论

          本文标题:elastic-job源码浅析(任务执行过程)

          本文链接:https://www.haomeiwen.com/subject/nsielftx.html