美文网首页
Spring的@EnableScheduling与@Schedu

Spring的@EnableScheduling与@Schedu

作者: Joshua1919 | 来源:发表于2020-05-30 13:53 被阅读0次

    首先来看下EnableScheduling的javadoc:
    @EnableScheduling启用了Spring的任务调度功能,这跟在xml中配置<task:*> 是一样的,它可以加在@Configuration上:

    @Configuration
    @EnableScheduling
    public class AppConfig {
    // various @Bean definitions
    }
    

    下面的代码可以在容器中的bean上查找到@Scheduled注解,比如:

    package com.myco.tasks;
    public class MyTask {
        @Scheduled(fixedRate=1000)
        public void work() {
           // task execution logic
        }
    }
    

    下面的配置可以保证MyTask.work()这个方法每一秒调用一次:

    @Configuration
    @EnableScheduling
    public class AppConfig {
        @Bean
        public MyTask task() {
            return new MyTask();
        }
    }
    

    此外,如果MyTask上添加了@Component注解,下面的配置也是一样的效果:

    @Configuration
    @EnableScheduling
    @ComponentScan(basePackages="com.myco.tasks")
    public class AppConfig {
    }
    

    被@Scheduled注解修饰的方法也可以直接添加在@Configuration的类内部:

    @Configuration
    @EnableScheduling
    public class AppConfig {
        @Scheduled(fixedRate=1000)
        public void work() {
           // task execution logic
        }
    }
    

    默认查找调度器的逻辑是:要么是唯一的一个类型是org.springframework.scheduling.TaskScheduler的bean,要么是一个名字是taskScheduler类型是TaskScheduler的bean,如果还没找到,继续查找唯一的ScheduledExecutorService,如果没有,继续查找名字是taskScheduler的ScheduledExecutorService。如果还没找到,Spring会创建一个默认的单线程的调度器。
    如果想对调度器做更多的定制化,可以注入实现了SchedulingConfigurer接口的bean,这样就可以访问ScheduledTaskRegistrar实例了,下面的例子演示了如何自定义Executor:

    @Configuration
    @EnableScheduling
    public class AppConfig implements SchedulingConfigurer {
        @Override
        public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
            taskRegistrar.setScheduler(taskExecutor());
        }
    
        @Bean(destroyMethod="shutdown")
        public Executor taskExecutor() {
            return Executors.newScheduledThreadPool(100);
        }
    }
    

    注意上面代码中的@Bean(destroyMethod="shutdown"),这个保证了Spring容器关闭的时候executor也可以正常关闭。
    实现了SchedulingConfigurer接口的时候,也可以对ScheduledTaskRegistrar里面的任务注册做更细粒度的控制,比如,下面的代码演示了每当Trigger触发的时候,都要执行特定bean的方法,

    @Configuration
    @EnableScheduling
    public class AppConfig implements SchedulingConfigurer {
        @Override
        public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
            taskRegistrar.setScheduler(taskScheduler());
            taskRegistrar.addTriggerTask(
                new Runnable() {
                    public void run() {
                        myTask().work();
                    }
                },
                new CustomTrigger()
            );
        }
        @Bean(destroyMethod="shutdown")
        public Executor taskScheduler() {
            return Executors.newScheduledThreadPool(42);
        }
        @Bean
        public MyTask myTask() {
            return new MyTask();
        }
    }
    

    下面的代码展示了如何用xml的方式配置任务:

    <beans>
        <task:annotation-driven scheduler="taskScheduler"/>
        <task:scheduler id="taskScheduler" pool-size="42"/>
        <task:scheduled-tasks scheduler="taskScheduler">
            <task:scheduled ref="myTask" method="work" fixed-rate="1000"/>
        </task:scheduled-tasks>
        <bean id="myTask" class="com.foo.MyTask"/>
    </beans>
    

    上面用xml的方式配置了一个fixed-rate任务,跟用java方式是一样的,但是java方式更强大。
    注意:@EnableScheduling只能作用在它自己的context中,因为context是存在父子关系的,如果是在web context或其他context中,需要重新声明@EnableScheduling 。
    看下源码的处理:

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    //这种方式就不用过多解释了
    @Import(SchedulingConfiguration.class)
    @Documented
    public @interface EnableScheduling {
    }
    

    继续看SchedulingConfiguration:

    @Configuration
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public class SchedulingConfiguration {
        @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
        @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
        public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
            return new ScheduledAnnotationBeanPostProcessor();
        }
    }
    

    没什么好说的,只是创建了一个名字叫org.springframework.context.annotation.internalScheduledAnnotationProcessor类型是ScheduledAnnotationBeanPostProcessor的bean,
    继续看ScheduledAnnotationBeanPostProcessor:

    public class ScheduledAnnotationBeanPostProcessor
            implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
            Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
            SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean
    

    继承结构非常简单,只实现了一些回调接口,从MergedBeanDefinitionPostProcessor可知道,这个也是个BeanPostProcessor,那它里面有没有使用aop呢?继续往下面看。
    看下构造函数:

    public ScheduledAnnotationBeanPostProcessor() {
        this.registrar = new ScheduledTaskRegistrar();
    }
    

    这个是用来实际注册task和执行task的类,首先创建出来,然后容器会回调:ScheduledAnnotationBeanPostProcessor#postProcessAfterInitialization:

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        ...
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        if (!this.nonAnnotatedClasses.contains(targetClass)) {
            //这里就是在查找标记了@Scheduled和@Schedules的方法
            Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                        Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                method, Scheduled.class, Schedules.class);
                        return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                    });
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(targetClass);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
                }
            }
            else {//如果找到了@Scheduled,遍历,调用processScheduled()
                // Non-empty set of methods
                annotatedMethods.forEach((method, scheduledMethods) ->
                        scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
                if (logger.isTraceEnabled()) {
                    logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                            "': " + annotatedMethods);
                }
            }
        }
        return bean;
    }
    

    继续看ScheduledAnnotationBeanPostProcessor#processScheduled,这里面会去解析所有的task:

    protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
        try {
            Runnable runnable = createRunnable(bean, method);
            ...
            Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
            ...
            // 这里是解析cron表达式,创建CronTask,然后把task加入到registrar中
            String cron = scheduled.cron();
            if (StringUtils.hasText(cron)) {
                String zone = scheduled.zone();
                if (this.embeddedValueResolver != null) {
                    cron = this.embeddedValueResolver.resolveStringValue(cron);
                    zone = this.embeddedValueResolver.resolveStringValue(zone);
                }
                if (StringUtils.hasLength(cron)) {
                    Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                    processedSchedule = true;
                    if (!Scheduled.CRON_DISABLED.equals(cron)) {
                        TimeZone timeZone;
                        if (StringUtils.hasText(zone)) {
                            timeZone = StringUtils.parseTimeZoneString(zone);
                        }
                        else {
                            timeZone = TimeZone.getDefault();
                        }
                        tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                    }
                }
            }
            // 这里是解析FixedDelay,创建FixedDelayTask,然后把task加入到registrar中
            // Check fixed delay
            long fixedDelay = scheduled.fixedDelay();
            if (fixedDelay >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
            }
            String fixedDelayString = scheduled.fixedDelayString();
            if (StringUtils.hasText(fixedDelayString)) {
                if (this.embeddedValueResolver != null) {
                    fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
                }
                if (StringUtils.hasLength(fixedDelayString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedDelay = parseDelayAsLong(fixedDelayString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
                    }
                    tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
                }
            }
            // 这里是解析FixedRate,创建FixedRateTask,然后把task加入到registrar中
            // Check fixed rate
            long fixedRate = scheduled.fixedRate();
            if (fixedRate >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
            }
            String fixedRateString = scheduled.fixedRateString();
            if (StringUtils.hasText(fixedRateString)) {
                if (this.embeddedValueResolver != null) {
                    fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
                }
                if (StringUtils.hasLength(fixedRateString)) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    try {
                        fixedRate = parseDelayAsLong(fixedRateString);
                    }
                    catch (RuntimeException ex) {
                        throw new IllegalArgumentException(
                                "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
                    }
                    tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
                }
            }
            // 最后把所有的task都存放到自己的scheduledTasks中
            synchronized (this.scheduledTasks) {
                Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
                regTasks.addAll(tasks);
            }
        }catch (IllegalArgumentException ex) {
            throw new IllegalStateException(
                    "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
        }
    }
    

    解析完了所有的task之后,准备开始执行:

    private void finishRegistration() {
        //默认调度器是null
        if (this.scheduler != null) {
            this.registrar.setScheduler(this.scheduler);
        }
        //取出容器中所有的SchedulingConfigurer,定制registrar
        if (this.beanFactory instanceof ListableBeanFactory) {
            Map<String, SchedulingConfigurer> beans =   ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
            List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
            AnnotationAwareOrderComparator.sort(configurers);
            for (SchedulingConfigurer configurer : configurers) {
                configurer.configureTasks(this.registrar);
            }
        }
        //有任务,但是没有调度器,下面的逻辑就是查找调度器,代码有删减
        if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
            try {   //首先按类型查找TaskScheduler
                this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
            }
            catch (NoUniqueBeanDefinitionException ex) {
                try {   
                    //然后按名称查找TaskScheduler,名称是:taskScheduler
                    this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    ...
                }
            }
            catch (NoSuchBeanDefinitionException ex) {
                try {
                    //继续按照类型查找ScheduledExecutorService
                    this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
                }
                catch (NoUniqueBeanDefinitionException ex2) {
                    try {
                        //继续按照名称查找ScheduledExecutorService,名称是:taskScheduler
                        this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
                    }
                    catch (NoSuchBeanDefinitionException ex3) {
                    }
                }
                catch (NoSuchBeanDefinitionException ex2) {
                }
            }
        }
        //最后执行afterPropertiesSet
        this.registrar.afterPropertiesSet();
    }
    

    上面定制了registrar,查找了调度器,看下真正的执行ScheduledTaskRegistrar#scheduleTasks:

    protected void scheduleTasks() {
        //如果调度器为null,则创建一个ConcurrentTaskScheduler,使用的是单线程的Executor。
        if (this.taskScheduler == null) {
            this.localExecutor = Executors.newSingleThreadScheduledExecutor();
            this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
        }
        //下面是分别执行4种类型的任务
        if (this.triggerTasks != null) {
            for (TriggerTask task : this.triggerTasks) {
                addScheduledTask(scheduleTriggerTask(task));
            }
        }
        if (this.cronTasks != null) {
            for (CronTask task : this.cronTasks) {
                addScheduledTask(scheduleCronTask(task));
            }
        }
        if (this.fixedRateTasks != null) {
            for (IntervalTask task : this.fixedRateTasks) {
                addScheduledTask(scheduleFixedRateTask(task));
            }
        }
        if (this.fixedDelayTasks != null) {
            for (IntervalTask task : this.fixedDelayTasks) {
                addScheduledTask(scheduleFixedDelayTask(task));
            }
        }
    }
    

    整体逻辑非常简单,具体使用的demo可以参考:https://github.com/xjs1919/enumdemo下面的 schedule-demo。

    转载请标明出处,欢迎扫码加关注。


    扫一扫关注微信公众号

    相关文章

      网友评论

          本文标题:Spring的@EnableScheduling与@Schedu

          本文链接:https://www.haomeiwen.com/subject/iegczhtx.html