美文网首页Spring源码分析
Spring之@Scheduled原理

Spring之@Scheduled原理

作者: 土豆肉丝盖浇饭 | 来源:发表于2018-12-07 18:56 被阅读1次

    前言

    当一个方法被加上@Schedule注解,然后做一些相关配置,在Spring容器启动之后,这个方法就会按照@Schedule注解的配置周期性或者延迟执行。Spring是如何办到这个的,本文就讲解一下这块的原理。

    源码分析

    扫描Task

    熟悉Spring的人都知道BeanPostProcessor这个回调接口,Spring框架扫描所有被@Scheduled注解的方法就是通过实现这个回调接口来实现的。

    具体实现类为ScheduledAnnotationBeanPostProcessor,在ScheduledAnnotationBeanPostProcessor的postProcessAfterInitialization会从bean中解析出有@Scheduled注解的方法,然后调用processScheduled处理这些方法。

    public Object postProcessAfterInitialization(final Object bean, String beanName) {
            Class<?> targetClass = AopUtils.getTargetClass(bean);
            if (!this.nonAnnotatedClasses.contains(targetClass)) {
                            //扫描bean内带有Scheduled注解的方法
                Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                        new MethodIntrospector.MetadataLookup<Set<Scheduled>>() {
                            @Override
                            public Set<Scheduled> inspect(Method method) {
                                Set<Scheduled> scheduledMethods =
                                        AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);
                                return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                            }
                        });
                if (annotatedMethods.isEmpty()) {
                                    //如果这个class没有注解的方法,缓存下来,因为一个class可能有多个bean
                    this.nonAnnotatedClasses.add(targetClass);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass());
                    }
                }
                else {
                    // Non-empty set of methods
                    for (Map.Entry<Method, Set<Scheduled>> entry : annotatedMethods.entrySet()) {
                        Method method = entry.getKey();
                        for (Scheduled scheduled : entry.getValue()) {
                                                    //处理这些有Scheduled的方法
                            processScheduled(scheduled, method, bean);
                        }
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                                "': " + annotatedMethods);
                    }
                }
            }
            return bean;
        }
    

    注册Task

    processScheduled方法会处理这个方法以及它对应的注解生成Task,然后把这些Task注册到ScheduledTaskRegistrar,ScheduledTaskRegistrar负责Task的生命周期。

    protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
            try {
                Assert.isTrue(method.getParameterTypes().length == 0,
                        "Only no-arg methods may be annotated with @Scheduled");
    
                Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
                Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
                boolean processedSchedule = false;
                String errorMessage =
                        "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
    
                Set<ScheduledTask> tasks = new LinkedHashSet<ScheduledTask>(4);
    
                // Determine initial delay
                long initialDelay = scheduled.initialDelay();
                String initialDelayString = scheduled.initialDelayString();
                if (StringUtils.hasText(initialDelayString)) {
                    Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
                    if (this.embeddedValueResolver != null) {
                        initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
                    }
                    try {
                        initialDelay = Long.parseLong(initialDelayString);
                    }
                    catch (NumberFormatException ex) {
                        throw new IllegalArgumentException(
                                "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into integer");
                    }
                }
    
                // Check cron expression
                String cron = scheduled.cron();
                if (StringUtils.hasText(cron)) {
                    Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                    processedSchedule = true;
                    String zone = scheduled.zone();
                    if (this.embeddedValueResolver != null) {
                        cron = this.embeddedValueResolver.resolveStringValue(cron);
                        zone = this.embeddedValueResolver.resolveStringValue(zone);
                    }
                    TimeZone timeZone;
                    if (StringUtils.hasText(zone)) {
                        timeZone = StringUtils.parseTimeZoneString(zone);
                    }
                    else {
                        timeZone = TimeZone.getDefault();
                    }
                    tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                }
    
                // At this point we don't need to differentiate between initial delay set or not anymore
                if (initialDelay < 0) {
                    initialDelay = 0;
                }
    
                // Check fixed delay
                long fixedDelay = scheduled.fixedDelay();
                if (fixedDelay >= 0) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    tasks.add(this.registrar.scheduleFixedDelayTask(new IntervalTask(runnable, fixedDelay, initialDelay)));
                }
                String fixedDelayString = scheduled.fixedDelayString();
                if (StringUtils.hasText(fixedDelayString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    if (this.embeddedValueResolver != null) {
                        fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
                    }
                    try {
                        fixedDelay = Long.parseLong(fixedDelayString);
                    }
                    catch (NumberFormatException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into integer");
                    }
                    tasks.add(this.registrar.scheduleFixedDelayTask(new IntervalTask(runnable, fixedDelay, initialDelay)));
                }
    
                // Check fixed rate
                long fixedRate = scheduled.fixedRate();
                if (fixedRate >= 0) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    tasks.add(this.registrar.scheduleFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay)));
                }
                String fixedRateString = scheduled.fixedRateString();
                if (StringUtils.hasText(fixedRateString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    if (this.embeddedValueResolver != null) {
                        fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
                    }
                    try {
                        fixedRate = Long.parseLong(fixedRateString);
                    }
                    catch (NumberFormatException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into integer");
                    }
                    tasks.add(this.registrar.scheduleFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay)));
                }
    
                // Check whether we had any attribute set
                Assert.isTrue(processedSchedule, errorMessage);
    
                // Finally register the scheduled tasks
                synchronized (this.scheduledTasks) {
                    Set<ScheduledTask> registeredTasks = this.scheduledTasks.get(bean);
                    if (registeredTasks == null) {
                        registeredTasks = new LinkedHashSet<ScheduledTask>(4);
                        this.scheduledTasks.put(bean, registeredTasks);
                    }
                    registeredTasks.addAll(tasks);
                }
            }
            catch (IllegalArgumentException ex) {
                throw new IllegalStateException(
                        "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
            }
        }
    

    运行Task

    任务注册到ScheduledTaskRegistrar之后,我们就要运行它们。触发操作会在下面2个回调方法内触发。
    afterSingletonsInstantiated和名字一样,会等所有Singleton类型的bean实例化后触发。
    onApplicationEvent会等ApplicationContext完成refresh后被触发。

    @Override
        public void afterSingletonsInstantiated() {
            if (this.applicationContext == null) {
                // Not running in an ApplicationContext -> register tasks early...
                finishRegistration();
            }
        }
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            if (event.getApplicationContext() == this.applicationContext) {
                // Running in an ApplicationContext -> register tasks this late...
                // giving other ContextRefreshedEvent listeners a chance to perform
                // their work at the same time (e.g. Spring Batch's job registration).
                finishRegistration();
            }
        }
    

    在finishRegistration中先会对ScheduledTaskRegistrar进行初始化,ScheduledTaskRegistrar设置我们容器中的线程池,如果没有配置这个默认线程池,ScheduledTaskRegistrar内部也会创建一个。初始化完成之后,调用ScheduledTaskRegistrar的afterPropertiesSet方法运行注册的任务。

    private void finishRegistration() {
            if (this.scheduler != null) {
                this.registrar.setScheduler(this.scheduler);
            }
    
            if (this.beanFactory instanceof ListableBeanFactory) {
                Map<String, SchedulingConfigurer> configurers =
                        ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
                for (SchedulingConfigurer configurer : configurers.values()) {
                    configurer.configureTasks(this.registrar);
                }
            }
    
            if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
                Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
                try {
                    // Search for TaskScheduler bean...
                    this.registrar.setTaskScheduler(this.beanFactory.getBean(TaskScheduler.class));
                }
                catch (NoUniqueBeanDefinitionException ex) {
                    try {
                        this.registrar.setTaskScheduler(
                                this.beanFactory.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, TaskScheduler.class));
                    }
                    catch (NoSuchBeanDefinitionException ex2) {
                        if (logger.isInfoEnabled()) {
                            logger.info("More than one TaskScheduler bean exists within the context, and " +
                                    "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                                    "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                    "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                    ex.getBeanNamesFound());
                        }
                    }
                }
                catch (NoSuchBeanDefinitionException ex) {
                    logger.debug("Could not find default TaskScheduler bean", ex);
                    // Search for ScheduledExecutorService bean next...
                    try {
                        this.registrar.setScheduler(this.beanFactory.getBean(ScheduledExecutorService.class));
                    }
                    catch (NoUniqueBeanDefinitionException ex2) {
                        try {
                            this.registrar.setScheduler(
                                    this.beanFactory.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, ScheduledExecutorService.class));
                        }
                        catch (NoSuchBeanDefinitionException ex3) {
                            if (logger.isInfoEnabled()) {
                                logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
                                        "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                                        "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                        "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                        ex2.getBeanNamesFound());
                            }
                        }
                    }
                    catch (NoSuchBeanDefinitionException ex2) {
                        logger.debug("Could not find default ScheduledExecutorService bean", ex2);
                        // Giving up -> falling back to default scheduler within the registrar...
                        logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
                    }
                }
            }
    
            this.registrar.afterPropertiesSet();
        }
    
    

    ScheduledTaskRegistrar

    ScheduledTaskRegistrar内部维护一个线程池以及我们添加的任务,负责任务的启动以及停止工作。

    private TaskScheduler taskScheduler;
    
        private ScheduledExecutorService localExecutor;
    
        private List<TriggerTask> triggerTasks;
    
        private List<CronTask> cronTasks;
    
        private List<IntervalTask> fixedRateTasks;
    
        private List<IntervalTask> fixedDelayTasks;
    
        private final Map<Task, ScheduledTask> unresolvedTasks = new HashMap<Task, ScheduledTask>(16);
    
        private final Set<ScheduledTask> scheduledTasks = new LinkedHashSet<ScheduledTask>(16);
    
    Task注册

    任务注册通过对应的add方法,随便举个例子如下

    public void addFixedRateTask(IntervalTask task) {
            if (this.fixedRateTasks == null) {
                this.fixedRateTasks = new ArrayList<IntervalTask>();
            }
            this.fixedRateTasks.add(task);
        }
    
    Task执行

    在ScheduledAnnotationBeanPostProcessor中我们通过ScheduledTaskRegistrar的afterPropertiesSet启动任务的执行。

    public void afterPropertiesSet() {
            scheduleTasks();
        }
    

    afterPropertiesSet内调用了scheduleTasks方法

    protected void scheduleTasks() {
            if (this.taskScheduler == null) {
                this.localExecutor = Executors.newSingleThreadScheduledExecutor();
                this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
            }
            if (this.triggerTasks != null) {
                for (TriggerTask task : this.triggerTasks) {
                    addScheduledTask(scheduleTriggerTask(task));
                }
            }
            if (this.cronTasks != null) {
                for (CronTask task : this.cronTasks) {
                    addScheduledTask(scheduleCronTask(task));
                }
            }
            if (this.fixedRateTasks != null) {
                for (IntervalTask task : this.fixedRateTasks) {
                    addScheduledTask(scheduleFixedRateTask(task));
                }
            }
            if (this.fixedDelayTasks != null) {
                for (IntervalTask task : this.fixedDelayTasks) {
                    addScheduledTask(scheduleFixedDelayTask(task));
                }
            }
        }
    

    在scheduleTasks方法内,会通过每种任务的schedule方法执行任务,schedule方法会返回一个ScheduledTask对象,ScheduledTask主要用来保存任务的Future,这个Future主要是用来取消任务执行。然后addScheduledTask把ScheduledTask保存起来。
    随便举一个任务的schedule方法的例子

    public ScheduledTask scheduleFixedDelayTask(IntervalTask task) {
            ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
            boolean newTask = false;
            if (scheduledTask == null) {
                scheduledTask = new ScheduledTask();
                newTask = true;
            }
            if (this.taskScheduler != null) {
                if (task.getInitialDelay() > 0) {
                    Date startTime = new Date(System.currentTimeMillis() + task.getInitialDelay());
                    scheduledTask.future =
                            this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), startTime, task.getInterval());
                }
                else {
                    scheduledTask.future =
                            this.taskScheduler.scheduleWithFixedDelay(task.getRunnable(), task.getInterval());
                }
            }
            else {
                addFixedDelayTask(task);
                this.unresolvedTasks.put(task, scheduledTask);
            }
            return (newTask ? scheduledTask : null);
        }
    
    Task销毁

    ScheduledTaskRegistrar实现了DisposableBean接口,在这个bean被销毁的时候,会触发取消任务执行。

    public void destroy() {
            for (ScheduledTask task : this.scheduledTasks) {
                task.cancel();
            }
            if (this.localExecutor != null) {
                this.localExecutor.shutdownNow();
            }
        }
    

    上面调用了ScheduledTask的cancel方法取消任务,我们来看下它的实现。

    public final class ScheduledTask {
    
        volatile ScheduledFuture<?> future;
    
    
        ScheduledTask() {
        }
    
    
        /**
         * Trigger cancellation of this scheduled task.
         */
        public void cancel() {
            ScheduledFuture<?> future = this.future;
            if (future != null) {
                future.cancel(true);
            }
        }
    
    }
    
    

    通过保存任务的future来对任务进行取消。

    相关文章

      网友评论

        本文标题:Spring之@Scheduled原理

        本文链接:https://www.haomeiwen.com/subject/uzudcqtx.html