美文网首页
spring源码------`@Schedule`跟`@Sche

spring源码------`@Schedule`跟`@Sche

作者: szhlcy | 来源:发表于2019-12-02 23:11 被阅读0次

    @[toc]

    1.@Scheduled@EnableScheduling

    1.1 @Scheduled注解

     Spring在3.0版本的时候加入@Scheduled注解来完成对定时任务的支持,其底层是spring自己实现的一套定时任务的逻辑,所以功能比价简单,但是能够实现单机部署上的定时任务。这里对这个注解的内部的属性进行分析。

    @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Repeatable(Schedules.class)
    public @interface Scheduled {
    
        //一个特殊的cron表达式,表示的是禁用Trigger
        String CRON_DISABLED = ScheduledTaskRegistrar.CRON_DISABLED;
    
        //cron表达式
        String cron() default "";
    
        //时区 默认是的是取当前的服务器的时区
        String zone() default "";
    
        //在任务最后一次调用的结束和下一次调用的开始之间以毫秒为单位执行带注释的方法
        long fixedDelay() default -1;
    
        //在最后一次调用的结束和下一次调用的开始之间以毫秒为单位执行带注释的方法
        String fixedDelayString() default "";
    
        //在两次调用之间以毫秒为单位执行带注释的方法。
        long fixedRate() default -1;
    
        //在两次调用之间以毫秒为单位执行带注释的方法。
        String fixedRateString() default "";
    
        //在第一次执行任务之前延迟多少秒
        long initialDelay() default -1;
    
        //在第一次执行任务之前延迟多少秒
        String initialDelayString() default "";
    
    }
    

     在@Scheduled注解中主要定了定时任务相关的属性,包含的常用的cron表达式,还添加了其他的属性。可以指定是否使用触发器(Tigger),可以使用触发器的方式(注意使用触发器的方式的时候)
     在spring中还有另外的一个注解跟@Scheduled注解一样的作用,就是@Schedules注解,这个注解内部包含多个@Scheduled注解,可以表示一个方法可以存在多个调度设置

    @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface Schedules {
    
        Scheduled[] value();
    
    }
    

    1.2 @EnableScheduling注解

    @EnableScheduling注解是spring在3.1版本的时候加上的,作用就是开启spring对定时任务的支持。这个注解也是对@Import注解的一个扩展使用。关于@Import注解可以看看前面的一篇文章spring源码解析------@Import注解解析与ImportSelector,ImportBeanDefinitionRegistrar以及DeferredImportSelector区别
    进行了解。

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Import(SchedulingConfiguration.class)
    @Documented
    public @interface EnableScheduling {
    
    }
    

    2 @Scheduled@EnableScheduling注解的解析

     基于对@Import注解的了解,这里直接进入到@EnableScheduling注解上面指定的SchedulingConfiguration类进行查看。

    2.1 开启定时任务入口的SchedulingConfiguration

     在SchedulingConfiguration中代码很少,只是注册了一个bean到容器中。但是这个bean却是定时任务的解析与创建的核心类。进入到代码中查看

    @Configuration
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public class SchedulingConfiguration {
    
        @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
        @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
        public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
            //注册ScheduledAnnotationBeanPostProcessor到容器中
            return new ScheduledAnnotationBeanPostProcessor();
        }
    }
    

     这里的代码不用进行解析了,只是进行了一个注解,接下来的才是重要的东西。

    2.2 解析@Scheduled@Schedules以及创建定时任务入口的ScheduledAnnotationBeanPostProcessor

     在解析ScheduledAnnotationBeanPostProcessor之前,我们需要先了解一下bean的生命周期最好了。因为这个类继承了实现了很多的spring的生命周期相关的接口,关于spring 的生命周期相关的可以看看Spring源码----Spring的Bean生命周期流程图及代码解释
    ,现在进行下一步的解析。

    2.2.1 ScheduledAnnotationBeanPostProcessor实现的生命周期的方法排序

     这里先吧ScheduledAnnotationBeanPostProcessor实现与继承的接口展示出来,然后说明主要的实现方法的调用顺序。

    public class ScheduledAnnotationBeanPostProcessor
            implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
            Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
            SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
        ......
        @Override
        //实现Ordered接口,返回当前BeanPostProcessor的顺序
        public int getOrder() {
            return LOWEST_PRECEDENCE;
        }
        ......
        @Override
        //实现EmbeddedValueResolverAware接口
        public void setEmbeddedValueResolver(StringValueResolver resolver) {
            //设置string字段解析的对象StringValueResolver
            this.embeddedValueResolver = resolver;
        }
    
        @Override
        //实现BeanNameAware接口
        public void setBeanName(String beanName) {
            this.beanName = beanName;
        }
        
        @Override
        //实现BeanFactoryAware接口
        public void setBeanFactory(BeanFactory beanFactory) {
            this.beanFactory = beanFactory;
        }
        
        @Override
        //实现ApplicationContextAware接口
        public void setApplicationContext(ApplicationContext applicationContext) {
            //设置spring的上下文进来
        this.applicationContext = applicationContext;
            if (this.beanFactory == null) {
                this.beanFactory = applicationContext;
            }
        }
        
        @Override
        //实现了SmartInitializingSingleton接口,在所有的单例bean实例化之后
        public void afterSingletonsInstantiated() {
            // Remove resolved singleton classes from cache
            //移除缓存
            this.nonAnnotatedClasses.clear();
    
            if (this.applicationContext == null) {
                // Not running in an ApplicationContext -> register tasks early...
                // 不再上下文中运行,这里表示的是spring的上下文还没有初始化完成时候调用,一般都不会进入到这里
                finishRegistration();
            }
        }
    
        @Override
        //实现了ApplicationListener,这里监听的是上下文刷新完成事件
        public void onApplicationEvent(ContextRefreshedEvent event) {
            if (event.getApplicationContext() == this.applicationContext) {
                // Running in an ApplicationContext -> register tasks this late...
                // giving other ContextRefreshedEvent listeners a chance to perform
                // their work at the same time (e.g. Spring Batch's job registration).
                //注册定时任务
                finishRegistration();
            }
        }
        
        @Override
        //实现了MergedBeanDefinitionPostProcessor,这里是空实现的,
        public void postProcessMergedBeanDefinition(RootBeanDefinition beanDefinition, Class<?> beanType, String beanName) {
        }
    
        @Override
        //这里实现BeanPostProcessor,也是空实现,在bean初始化之前
        public Object postProcessBeforeInitialization(Object bean, String beanName) {
            return bean;
        }
    
        @Override
        //实现BeanPostProcessor,在bean初始化之后�
        public Object postProcessAfterInitialization(Object bean, String beanName) {
            ......
        }
    
        @Override
        //实现了DestructionAwareBeanPostProcessor,bean销毁之前调用
        public void postProcessBeforeDestruction(Object bean, String beanName) {
            Set<ScheduledTask> tasks;
            synchronized (this.scheduledTasks) {
                tasks = this.scheduledTasks.remove(bean);
            }
            if (tasks != null) {
                for (ScheduledTask task : tasks) {
                    task.cancel();
                }
            }
        }
    
        @Override
        //实现了DestructionAwareBeanPostProcessor,是否注册销毁方法的时候调用
        public boolean requiresDestruction(Object bean) {
            synchronized (this.scheduledTasks) {
                return this.scheduledTasks.containsKey(bean);
            }
        }
    
        @Override
        //实现DisposableBean,bean销毁的时候调用
        public void destroy() {
            synchronized (this.scheduledTasks) {
                Collection<Set<ScheduledTask>> allTasks = this.scheduledTasks.values();
                for (Set<ScheduledTask> tasks : allTasks) {
                    for (ScheduledTask task : tasks) {
                        task.cancel();
                    }
                }
                this.scheduledTasks.clear();
            }
            this.registrar.destroy();
        }
    }
    

     会发现ScheduledAnnotationBeanPostProcessor足足实现了9个生命周期有关的接口类。现在需要来整理一下实现的这些类的方法,后面才好进行分析。

    1. ApplicationContextAwaresetApplicationContext,EmbeddedValueResolverAwaresetEmbeddedValueResolver
    2. MergedBeanDefinitionPostProcessorpostProcessMergedBeanDefinition
    3. BeanNameAware,BeanFactoryAware
    4. BeanPostProcessorpostProcessAfterInitialization
    5. SmartInitializingSingletonafterSingletonsInstantiated
    6. DestructionAwareBeanPostProcessorrequiresDestruction
    7. ApplicationListener<ContextRefreshedEvent>的事件
    8. DestructionAwareBeanPostProcessorrequiresDestruction
    9. DisposableBeandestroy

     接下里按照这个顺序,只分析重点的方法。

    2.2.2 在bean初始化之后的@Scheduled以及@Schedules 注解的解析

     先看看一个bean在初始化之后步骤。

        @Override
        //实现BeanPostProcessor,在bean初始化之后�
        public Object postProcessAfterInitialization(Object bean, String beanName) {
            //bean是spring的aop,task,schedule相关的基础接口类的实现类,则进行过滤
            if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
                    bean instanceof ScheduledExecutorService) {
                // Ignore AOP infrastructure such as scoped proxies.
                return bean;
            }
            //获取当前bean的真正的目标Class
            Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
            //当前的目标类不再已缓存的不包含注解的类的集合中,并且当前目标类包含了Scheduled或者Schedules注解
            if (!this.nonAnnotatedClasses.contains(targetClass) &&
                    AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
                //从目标类中获取贴有Scheduled跟Schedules的方法
                Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                        (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                            Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                    method, Scheduled.class, Schedules.class);
                            return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                        });
                //如果不含有Scheduled跟Schedules的方法的方法,就把这个类加入到nonAnnotatedClasses集合
                if (annotatedMethods.isEmpty()) {
                    this.nonAnnotatedClasses.add(targetClass);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
                    }
                }
                else {
                    // Non-empty set of methods
                    //迭代这些需要定时执行的方法
                    annotatedMethods.forEach((method, scheduledMethods) ->
                            //迭代对应的方法上面的Scheduled跟Schedules注解
                            scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
                    if (logger.isTraceEnabled()) {
                        logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                                "': " + annotatedMethods);
                    }
                }
            }
            return bean;
        }
    

     这个方法的主要逻辑就是从已经初始化完毕之后的bean中寻找贴有@Scheduled以及@Schedules 注解的方法,然后迭代这些方法上的注解列表,并解析这些注解内部的属性并进一步的处理。处理的内部属性的方法在ScheduledAnnotationBeanPostProcessor的另外一个方法中。这里跟进继续分析

        protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
            try {
                //根据bean以及需要定时调用的方法创建一个Runnable
                Runnable runnable = createRunnable(bean, method);
                boolean processedSchedule = false;
                String errorMessage =
                        "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
    
                Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
    
                // Determine initial delay
                //检查这个方法是否配置了需要延迟执行
                long initialDelay = scheduled.initialDelay();
                //获取配置的延迟执行的时间
                String initialDelayString = scheduled.initialDelayString();
                //如果延迟执行的时间存在指
                if (StringUtils.hasText(initialDelayString)) {
                    Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
                    //获取延迟的时间
                    if (this.embeddedValueResolver != null) {
                        initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
                    }
                    if (StringUtils.hasLength(initialDelayString)) {
                        try {
                            initialDelay = parseDelayAsLong(initialDelayString);
                        }
                        catch (RuntimeException ex) {
                            throw new IllegalArgumentException(
                                    "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
                        }
                    }
                }
    
                // Check cron expression
                //获取cron表达式
                String cron = scheduled.cron();
                //检查cron表达式是不是有值
                if (StringUtils.hasText(cron)) {
                    //获取指定的时区
                    String zone = scheduled.zone();
                    if (this.embeddedValueResolver != null) {
                        cron = this.embeddedValueResolver.resolveStringValue(cron);
                        zone = this.embeddedValueResolver.resolveStringValue(zone);
                    }
                    if (StringUtils.hasLength(cron)) {
                        //如果cron表达式不支持延迟
                        Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                        processedSchedule = true;
                        //如果指定了不实用cron的方式进行触发
                        if (!Scheduled.CRON_DISABLED.equals(cron)) {
                            TimeZone timeZone;
                            if (StringUtils.hasText(zone)) {
                                //解析时区
                                timeZone = StringUtils.parseTimeZoneString(zone);
                            }
                            else {
                                //没有指定时区,则使用默认的时区
                                timeZone = TimeZone.getDefault();
                            }
                            //将创建的ScheduledTask对象加入到集合中
                            tasks.add(
                                    //使用ScheduledTaskRegistrar创建一个ScheduledTask
                                    this.registrar.scheduleCronTask(
                                            //根绝需要执行的Runable对象以及触发器创建一个CronTask
                                            new CronTask(runnable,
                                                    //根据cron表达式以及时区创建一个触发器CronTrigger
                                                    new CronTrigger(cron, timeZone))));
                        }
                    }
                }
    
                // At this point we don't need to differentiate between initial delay set or not anymore
                //这里直接将延迟执行的时间改为0,因为小于0表示的是不需要进行延迟
                if (initialDelay < 0) {
                    initialDelay = 0;
                }
    
                // Check fixed delay
                //获取间隔调用的时间
                long fixedDelay = scheduled.fixedDelay();
                if (fixedDelay >= 0) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    //根据间隔调用时间创建任务
                    tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
                }
                String fixedDelayString = scheduled.fixedDelayString();
                if (StringUtils.hasText(fixedDelayString)) {
                    //解析字符串形式的延迟调用时间
                    if (this.embeddedValueResolver != null) {
                        fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
                    }
                    if (StringUtils.hasLength(fixedDelayString)) {
                        Assert.isTrue(!processedSchedule, errorMessage);
                        processedSchedule = true;
                        try {
                            fixedDelay = parseDelayAsLong(fixedDelayString);
                        }
                        catch (RuntimeException ex) {
                            throw new IllegalArgumentException(
                                    "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
                        }
                        //根据间隔调用时间创建任务
                        tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
                    }
                }
    
                // Check fixed rate
                //获取调用频率
                long fixedRate = scheduled.fixedRate();
                if (fixedRate >= 0) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    //根据调用频率创建定时调度任务
                    tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
                }
                //获取string形式的调用频率
                String fixedRateString = scheduled.fixedRateString();
                if (StringUtils.hasText(fixedRateString)) {
                    //解析string形式的调用频率
                    if (this.embeddedValueResolver != null) {
                        fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
                    }
                    if (StringUtils.hasLength(fixedRateString)) {
                        Assert.isTrue(!processedSchedule, errorMessage);
                        processedSchedule = true;
                        try {
                            fixedRate = parseDelayAsLong(fixedRateString);
                        }
                        catch (RuntimeException ex) {
                            throw new IllegalArgumentException(
                                    "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
                        }
                        //根据调用频率创建定时调度任务
                        tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
                    }
                }
    
                // Check whether we had any attribute set
                //检查是否有设置创建定时调度任务的参数('cron', 'fixedDelay(String)', or 'fixedRate(String)'),没有则需要报错
                Assert.isTrue(processedSchedule, errorMessage);
    
                // Finally register the scheduled tasks
                //最后注册定时调度任务
                synchronized (this.scheduledTasks) {
                    Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
                    regTasks.addAll(tasks);
                }
            }
            catch (IllegalArgumentException ex) {
                throw new IllegalStateException(
                        "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
            }
        }
    

     上面的这个方法可以分为两个大的步骤:1. 解析@Scheduled注解中的内部属性,然后创建定时任务,然后加入到任务列表中;2. 将任务保存在内部的一个bean与其相关的定时任务方法集合scheduledTasks中。而这里的第一步又分了4个属性的解析。

    1. initialDelay是否延迟执行任务的属性
    2. cron表达式的解析,这里需要注意的cron表达式不能跟延迟一起使用
    3. fixedDelay 在上一次调用完毕之后经过多久再次调用的属性
    4. fixedRate 方法调用的频率属性

     这里需要重点关注的两个位置在创建定时任务的时候的三个方法。1. createRunnable 方法 2. CronTrigger对象的创建 3.CronTask对象的创建 4.scheduleCronTask方法的调用。接下来就是分析这个方法

    2.2.3 定时任务的创建
    2.2.3.1 createRunnable 创建包含定时执行的ScheduledMethodRunnable对象

     直接进入到createRunnable 方法进行查看

        protected Runnable createRunnable(Object target, Method method) {
            //贴有@Scheduled注解的方法需要是无参方法
            Assert.isTrue(method.getParameterCount() == 0, "Only no-arg methods may be annotated with @Scheduled");
            //从目标类(bean)中寻找一个对应的可以调用的方法
            Method invocableMethod = AopUtils.selectInvocableMethod(method, target.getClass());
            //创建一个ScheduledMethodRunnable实现了Runable接口
            return new ScheduledMethodRunnable(target, invocableMethod);
        }
    

     发现就是获取需要定时调用的方法,然后创建一个ScheduledMethodRunnable对象。而这个对象又实现了Runnable接口,因此可以 定时的执行其内保存的调用方法。

    2.2.3.2 根据表达式创建触发器CronTrigger

     在创建CronTrigger对象的时候会把表达式跟时区这两个属性传入,在CronTrigger会创建一个CronSequenceGenerator对象,而这个对象在初始化的时候会解析对应的表达式以及对应的时区,然后保存对象的信息,在后面计算下次定时执行时间的时候用到

        public CronTrigger(String expression, TimeZone timeZone) {
            this.sequenceGenerator = new CronSequenceGenerator(expression, timeZone);
        }
    
        public CronSequenceGenerator(String expression, TimeZone timeZone) {
            this.expression = expression;
            this.timeZone = timeZone;
            //解析时间表达式
            parse(expression);
        }   
    
    2.2.3.3 创建包含触发器的定时任务对象CronTask

    CronTask对象中保存了触发器对象,用来在后面计算定时执行时间的时候用。然后将需要定时执行的ScheduledMethodRunnable对象保存了起来,在后面执行的时候使用。

        public CronTask(Runnable runnable, CronTrigger cronTrigger) {
            super(runnable, cronTrigger);
            this.expression = cronTrigger.getExpression();
        }
    
    2.2.3.4 执行定时任务

     接下来执行的逻辑也是比较冗长的。scheduleCronTask方法在ScheduledTaskRegistrar类中。这里直接进行分析。

        public ScheduledTask scheduleCronTask(CronTask task) {
            //从未解析的任务中获取是否有这个任务,有则移除
            ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
            boolean newTask = false;
            //不存在这个任务,则创建一个新的任务
            if (scheduledTask == null) {
                scheduledTask = new ScheduledTask(task);
                newTask = true;
            }
            //如果调度器不是空
            if (this.taskScheduler != null) {
                //对任务进行调度,这里的taskScheduler是ConcurrentTaskScheduler
                scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
            }
            else {//此时taskScheduler还没有初始化,这执行不了定时任务,需要先记录起来,后面taskScheduler初始化之后进行调用
    
                //增加到cronTasks中
                addCronTask(task);
                //保存到没有执行的
                this.unresolvedTasks.put(task, scheduledTask);
            }
            //返回scheduledTask或者null,这个时候任务已经在于jdk自带的ScheduledExecutorService中到后面会进行自动调用
            return (newTask ? scheduledTask : null);
        }
    

     可以看到这里有几个内部的字段,其中unresolvedTasks是用来保存没有解析过的任务集合,taskScheduler是指定定时任务的对象ConcurrentTaskScheduler,现在说一下这个对象的创建时机,ScheduledTaskRegistrar类实现了InitializingBean接口的afterPropertiesSet方法,而在afterPropertiesSet方法中会有实力化ConcurrentTaskScheduler类,而实例化的过程也是一个重要的过程,因为这个里面涉及到了定时任务调度用的线程池。

        @Override
        public void afterPropertiesSet() {
            scheduleTasks();
        }
    
        @SuppressWarnings("deprecation")
        protected void scheduleTasks() {
            //TaskScheduler还没创建则进行创建
            if (this.taskScheduler == null) {
                //创建用于定时调用的执行器,使用的是jdk自带的ScheduledExecutorService
                this.localExecutor = Executors.newSingleThreadScheduledExecutor();
                //创建ConcurrentTaskScheduler,用于执行任务的调度
                this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
            }
            if (this.triggerTasks != null) {
                for (TriggerTask task : this.triggerTasks) {
                    addScheduledTask(scheduleTriggerTask(task));
                }
            }
            //这里面放根据cron表达式创建的定时任务对象
            if (this.cronTasks != null) {
                for (CronTask task : this.cronTasks) {
                    addScheduledTask(scheduleCronTask(task));
                }
            }
            //根据fixRate属性创建的定时任务对象
            if (this.fixedRateTasks != null) {
                for (IntervalTask task : this.fixedRateTasks) {
                    addScheduledTask(scheduleFixedRateTask(task));
                }
            }
            //根据fixDelay属性创建的定时任务对象
            if (this.fixedDelayTasks != null) {
                for (IntervalTask task : this.fixedDelayTasks) {
                    addScheduledTask(scheduleFixedDelayTask(task));
                }
            }
        }
    

     到这里就可以知道,在Spring中用于进行定时任务调度用的执行器是jdk自带的ScheduledExecutorService类。我们同时看到对于根据@Schedule属性形成的不同的调度对象,会有不同的处理形式进行调度,但是在这些处理的方法中都会用到同一个对象进行调度,就是上面的ConcurrentTaskScheduler对象,这里就不例举代码了,只列举出共同的点

        public ScheduledTask scheduleCronTask(CronTask task) {
            .......
            scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
            .......
        }
    
        public ScheduledTask scheduleFixedRateTask(FixedRateTask task) {
            ......
            scheduledTask.future =this.taskScheduler.scheduleAtFixedRate(task.getRunnable(), task.getInterval());
            ......
        }
        public ScheduledTask scheduleFixedDelayTask(FixedDelayTask task) {
            ......
            scheduledTask.future =this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), startTime, task.getInterval());
            ......
        }
    

     因此这里有必要进入到ConcurrentTaskScheduler的创建方法中进行查看,

        static {
            try {
                managedScheduledExecutorServiceClass = ClassUtils.forName(
                        "javax.enterprise.concurrent.ManagedScheduledExecutorService",
                        ConcurrentTaskScheduler.class.getClassLoader());
            }
            catch (ClassNotFoundException ex) {
                // JSR-236 API not available...
                managedScheduledExecutorServiceClass = null;
            }
        }
    
        public ConcurrentTaskScheduler(ScheduledExecutorService scheduledExecutor) {
            //初始化父类
            super(scheduledExecutor);
            //初始化调度器
            this.scheduledExecutor = initScheduledExecutor(scheduledExecutor);
        }
    
        private ScheduledExecutorService initScheduledExecutor(@Nullable ScheduledExecutorService scheduledExecutor) {
            if (scheduledExecutor != null) {
                this.scheduledExecutor = scheduledExecutor;
                //初始化的时候会寻找ManagedScheduledExecutorService这个类,这个类是JSR236的规范跟ScheduledExecutorService一样的作用对其做了一些扩展,默认为false
                this.enterpriseConcurrentScheduler = (managedScheduledExecutorServiceClass != null &&
                        managedScheduledExecutorServiceClass.isInstance(scheduledExecutor));
            }
            else {
                this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
                this.enterpriseConcurrentScheduler = false;
            }
            return this.scheduledExecutor;
        }
    

     可以看到这里主要就是设置调度定时任务用的执行器。现在看看上面在ScheduledTaskRegistrar中调用ConcurrentTaskScheduler中的方法,这里就列举一个schedule方法,其他的方法逻辑大同小异。

        public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
            try {
                //存在ManagedScheduledExecutorService这个类的时候,这个是true
                if (this.enterpriseConcurrentScheduler) {
                    return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
                }
                else {
                    //创建错误处理器
                    ErrorHandler errorHandler =
                            (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
                    //创建一个ReschedulingRunnable然后进行调度,并返回一个ScheduledFuture
                    return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
                }
            }
            catch (RejectedExecutionException ex) {
                throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
            }
        }
    

     这里可以看到前面的enterpriseConcurrentScheduler起到被用到了,这里进入到下面的else分之,这里就是创建一个调度的时候出现异常的时候的异常处理器,然后会创建一个ReschedulingRunnable对象,然后调用这个对象的调度方法,进行任务的调用。

        public ReschedulingRunnable(
                Runnable delegate, Trigger trigger, ScheduledExecutorService executor, ErrorHandler errorHandler) {
            //设置调度的任务跟错误处理器
            super(delegate, errorHandler);
            this.trigger = trigger;
            this.executor = executor;
        }
    
        @Nullable
        public ScheduledFuture<?> schedule() {
            synchronized (this.triggerContextMonitor) {
                //计算出下一次的调用时间,这里会用到前面保存过的调用时间信息,然后进行计算。
                this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
                if (this.scheduledExecutionTime == null) {
                    return null;
                }
                long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
                //进行任务的调度,这里调用的不是根据需要调用的方法创建出来的调度对象,是这个类本身
                this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
                return this;
            }
        }
    
        public void run() {
            //获取当前的饿时间
            Date actualExecutionTime = new Date();
            //调用父类的run方法,父类的fun方法会调用需要调用的方法创建出来的调度对象,也就是我们贴有注解的方法
            super.run();
            Date completionTime = new Date();
            synchronized (this.triggerContextMonitor) {
                Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
                //更新调用时间
                this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
                //检查这个方法是否被取消了
                if (!obtainCurrentFuture().isCancelled()) {
                    //进行类本身的调用,这里主要是刷新下次的调用时间
                    schedule();
                }
            }
        }
    

     到这里整个调度的分析过程完成的差不多,这里会计算出来下一次的调度时间,然后同时会把ReschedulingRunnable对象本身给传到对应的ScheduledExecutorService中,而ReschedulingRunnable对象间接的实现了Runnable对象,因此这个对象的run方法对定时的被调用,而且这个run方法中必须包含对本身的schedule调用才能实现往复循环的调用。

    2.3 定时任务的取消

    2.3.1 第一种情况,贴有注解的bean销毁的时候取消任务

     到这里整个的贴有@Schedule@Schedules注解的方法的调用过程就分析完了,现在还剩下一点就是,定时任务的取消了。其实关于定时任务的取消从前面我们就能看出一点的端倪出来,因为ScheduledAnnotationBeanPostProcessor类实现了DestructionAwareBeanPostProcessor类就能看出来了。现在看看实现的这个接口的逻辑

        //实现了DestructionAwareBeanPostProcessor,bean销毁之前调用
        public void postProcessBeforeDestruction(Object bean, String beanName) {
            Set<ScheduledTask> tasks;
            synchronized (this.scheduledTasks) {
                //从任务列表中移除对应的bean,如果存在会则会取出来
                tasks = this.scheduledTasks.remove(bean);
            }
            if (tasks != null) {
                //迭代任务
                for (ScheduledTask task : tasks) {
                    //依次调用取消调度
                    task.cancel();
                }
            }
    

     可以看到任务的取消相比较,任务的创建还是简单的。

    2.3.2 容器销毁的时候取消任务

    ScheduledAnnotationBeanPostProcessor类实现了DisposableBean因此,在容器销毁的时候会调用其实现的destroy方法

        //实现DisposableBean,bean销毁的时候调用
        public void destroy() {
            synchronized (this.scheduledTasks) {
                //获取所有的任务
                Collection<Set<ScheduledTask>> allTasks = this.scheduledTasks.values();
                //一次取消
                for (Set<ScheduledTask> tasks : allTasks) {
                    for (ScheduledTask task : tasks) {
                        task.cancel();
                    }
                }
                //情况任务列表
                this.scheduledTasks.clear();
            }
            //销毁ScheduledTaskRegistrar中保存的任务
            this.registrar.destroy();
        }
    
    

     到此整个分析就结束了,过程可能有点复杂,最好是自己debug的时候进行看,也可以多看几次加深印象。

    相关文章

      网友评论

          本文标题:spring源码------`@Schedule`跟`@Sche

          本文链接:https://www.haomeiwen.com/subject/zjsqgctx.html