美文网首页Spring技术Spring源码分析
二十二、Spring Boot使用Spring Task进行任务

二十二、Spring Boot使用Spring Task进行任务

作者: 木石前盟Caychen | 来源:发表于2019-04-25 16:04 被阅读1次

    1、Spring中的定时任务

    1.1、使用xml形式

    任务类MyScheduler:

    public class MyScheduler {
    
        public void print(){
            System.out.println("MyScheduler:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS")));
        }
    }
    

    spring配置spring-task.xml:

    <task:annotation-driven/>
        
    <bean id="taskScheduler" class="org.com.cay.scheduler.MyScheduler"/>
    
    <task:scheduled-tasks>
        <task:scheduled ref="taskScheduler" method="print" cron="0/5 * * * * ?"/>
    </task:scheduled-tasks>
    

    运行项目即可实现每隔5s会输出一次信息。

    1.2、使用注解形式

    @Component
    public class OtherScheduler {
    
        @Scheduled(cron="0/10 * * * * ?")
        public void show(){
            System.out.println("OtherScheduler:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss:SSS")));
        }
    }
    

    运行项目即可实现每隔10s会输出一次信息。

    2、task命名空间

    task命名空间使用 TaskNamespaceHandler 来处理:

    //{@code NamespaceHandler} for the 'task' namespace.
    public class TaskNamespaceHandler extends NamespaceHandlerSupport {
    
        @Override
        public void init() {
            this.registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
            this.registerBeanDefinitionParser("executor", new ExecutorBeanDefinitionParser());
            this.registerBeanDefinitionParser("scheduled-tasks", new ScheduledTasksBeanDefinitionParser());
            this.registerBeanDefinitionParser("scheduler", new SchedulerBeanDefinitionParser());
        }
    }
    

    而其中:

    • AnnotationDrivenBeanDefinitionParser用于解析<task:annotation-driven />标签:

      <task:annotation-driven scheduler="" exception-handler="" executor="" mode="" proxy-target-class=""/>
      

    • SchedulerBeanDefinitionParser用于解析<task:scheduler />标签:

      <task:scheduler id="调度器id" pool-size="线程池大小" />
      

    • ScheduledTasksBeanDefinitionParser用于解析<task:scheduled-tasks />标签:

      <task:scheduled-tasks scheduler="指定使用调度器的id">
              <task:scheduled ref="任务bean" method="任务bean的方法" cron="" fixed-delay="" fixed-rate="" initial-delay="" trigger=""/>
      </task:scheduled-tasks>
      

    • ExecutorBeanDefinitionParser用于解析<task:executor />标签:

      <task:executor id="" pool-size="" keep-alive="" queue-capacity="" rejection-policy="" />
      

    3、源码分析

    3.1、SchedulerBeanDefinitionParser:

    直接看类定义:

    public class SchedulerBeanDefinitionParser extends AbstractSingleBeanDefinitionParser {
    
        @Override
        protected String getBeanClassName(Element element) {
            return "org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler";
        }
    
        @Override
        protected void doParse(Element element, BeanDefinitionBuilder builder) {
            String poolSize = element.getAttribute("pool-size");
            if (StringUtils.hasText(poolSize)) {
                builder.addPropertyValue("poolSize", poolSize);
            }
        }
    
    }
    

    可以想象,一旦配置了

    <task:scheduler id="调度器id" pool-size="线程池大小" />
    

    ,就会向Spring容器中注册一个Bean,id为指定的id值,值为org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler类的实例。

    至于我为什么会先讲这个类,后面会解释。

    3.2、ExecutorBeanDefinitionParser:

    直接看类定义:

    public class ExecutorBeanDefinitionParser extends AbstractSingleBeanDefinitionParser {
    
        @Override
        protected String getBeanClassName(Element element) {
            return "org.springframework.scheduling.config.TaskExecutorFactoryBean";
        }
    
        @Override
        protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
            String keepAliveSeconds = element.getAttribute("keep-alive");
            if (StringUtils.hasText(keepAliveSeconds)) {
                builder.addPropertyValue("keepAliveSeconds", keepAliveSeconds);
            }
            String queueCapacity = element.getAttribute("queue-capacity");
            if (StringUtils.hasText(queueCapacity)) {
                builder.addPropertyValue("queueCapacity", queueCapacity);
            }
            configureRejectionPolicy(element, builder);
            String poolSize = element.getAttribute("pool-size");
            if (StringUtils.hasText(poolSize)) {
                builder.addPropertyValue("poolSize", poolSize);
            }
        }
        
        //configureRejectionPolicy函数...
    }
    

    同上节一样,一旦配置了

    <task:executor id="" pool-size="" keep-alive="" queue-capacity="" rejection-policy="" />
    

    ,就会向Spring容器中注册一个Bean,值为org.springframework.scheduling.config.TaskExecutorFactoryBean类的实例对象。

    3.3、AnnotationDrivenBeanDefinitionParser

    该类用于解析使用注解形式的任务调度,看源码(部分):

    public class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
        
        private static final String ASYNC_EXECUTION_ASPECT_CLASS_NAME =
                "org.springframework.scheduling.aspectj.AnnotationAsyncExecutionAspect";
    
        @Override
        public BeanDefinition parse(Element element, ParserContext parserContext) {
            Object source = parserContext.extractSource(element);
    
            // Register component for the surrounding <task:annotation-driven> element.
            CompositeComponentDefinition compDefinition = new CompositeComponentDefinition(element.getTagName(), source);
            parserContext.pushContainingComponent(compDefinition);
    
            // Nest the concrete post-processor bean in the surrounding component.
            BeanDefinitionRegistry registry = parserContext.getRegistry();
    
            String mode = element.getAttribute("mode");
            if ("aspectj".equals(mode)) {
                // mode="aspectj"
                registerAsyncExecutionAspect(element, parserContext);
            }
            else {
                // mode="proxy"
                if (registry.containsBeanDefinition(TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)) {
                    parserContext.getReaderContext().error(
                            "Only one AsyncAnnotationBeanPostProcessor may exist within the context.", source);
                }
                else {
                    BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
                            "org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor");
                    builder.getRawBeanDefinition().setSource(source);
                    String executor = element.getAttribute("executor");
                    if (StringUtils.hasText(executor)) {
                        builder.addPropertyReference("executor", executor);
                    }
                    String exceptionHandler = element.getAttribute("exception-handler");
                    if (StringUtils.hasText(exceptionHandler)) {
                        builder.addPropertyReference("exceptionHandler", exceptionHandler);
                    }
                    if (Boolean.valueOf(element.getAttribute(AopNamespaceUtils.PROXY_TARGET_CLASS_ATTRIBUTE))) {
                        builder.addPropertyValue("proxyTargetClass", true);
                    }
                    registerPostProcessor(parserContext, builder, TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME);
                }
            }
    
            if (registry.containsBeanDefinition(TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)) {
                parserContext.getReaderContext().error(
                        "Only one ScheduledAnnotationBeanPostProcessor may exist within the context.", source);
            }
            else {
                BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
                        "org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor");
                builder.getRawBeanDefinition().setSource(source);
                String scheduler = element.getAttribute("scheduler");
                if (StringUtils.hasText(scheduler)) {
                    builder.addPropertyReference("scheduler", scheduler);
                }
                registerPostProcessor(parserContext, builder, TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME);
            }
    
            // Finally register the composite component.
            parserContext.popAndRegisterContainingComponent();
    
            return null;
        }
        
        //other code...
        
    }
    

    AnnotationDrivenBeanDefinitionParserd源码中可以看到,它注册了两个后置处理器,分别是:

    • org.springframework.scheduling.annotation.AsyncAnnotationBeanPostProcessor

    • org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor

    前者用于异步调用的处理器,而后者用于任务调度的处理器。

    本文主要讲解任务调度,所以来看ScheduledAnnotationBeanPostProcessor处理器。

    首先来看看ScheduledAnnotationBeanPostProcessor类定义:

    public class ScheduledAnnotationBeanPostProcessor
            implements MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
            Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
            SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {}
    

    从类定义来看,ScheduledAnnotationBeanPostProcessor实现了ApplicationListener接口,用于监听ContextRefreshedEvent事件。

    先来看初始化后的后置处理方法ScheduledAnnotationBeanPostProcessor#postProcessAfterInitialization方法:

    @Override
    public Object postProcessAfterInitialization(final Object bean, String beanName) {
        Class<?> targetClass = AopUtils.getTargetClass(bean);
        if (!this.nonAnnotatedClasses.contains(targetClass)) {
            //首先获取所有带有注解(@Scheduled和@Schedules)的方法
            Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass, 
    new MethodIntrospector.MetadataLookup<Set<Scheduled>>() {
                @Override                                                                                     public Set<Scheduled> inspect(Method method) {
                    Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);
                                                                                                                return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                }
            });
            //如果注解方法集合为空
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(targetClass);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass());
                }
            }
            else {
                //带有注解的方法集合不为空时
                for (Map.Entry<Method, Set<Scheduled>> entry : annotatedMethods.entrySet()) {
                    Method method = entry.getKey();
                    for (Scheduled scheduled : entry.getValue()) {
                        //依次遍历处理注解方法,重点
                        processScheduled(scheduled, method, bean);
                    }
                }
                if (logger.isDebugEnabled()) {
                    logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                                 "': " + annotatedMethods);
                }
            }
        }
        return bean;
    }
    

    再来看处理定时任务的方法ScheduledAnnotationBeanPostProcessor#processScheduled方法:

    protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
        try {
            Assert.isTrue(method.getParameterTypes().length == 0,
                          "Only no-arg methods may be annotated with @Scheduled");
    
            Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());
            //创建ScheduledMethodRunnable(父接口Runnable)对象,主要是用来反射定时方法
            Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);
            boolean processedSchedule = false;
            String errorMessage =
                "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
    
            // 初始化一个定时任务集合 
            Set<ScheduledTask> tasks = new LinkedHashSet<ScheduledTask>(4);
    
            // Determine initial delay
            //判断注解属性中是否带有initialDelay和initialDelayString
            long initialDelay = scheduled.initialDelay();
            String initialDelayString = scheduled.initialDelayString();
            if (StringUtils.hasText(initialDelayString)) {
                Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
                if (this.embeddedValueResolver != null) {
                    initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
                }
                try {
                    initialDelay = Long.parseLong(initialDelayString);
                }
                catch (NumberFormatException ex) {
                    throw new IllegalArgumentException(
                        "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into integer");
                }
            }
    
            // Check cron expression
            //判断是否是cron表达式
            String cron = scheduled.cron();
            if (StringUtils.hasText(cron)) {
                Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                processedSchedule = true;
                //获取注解属性中的zone时区值
                String zone = scheduled.zone();
                if (this.embeddedValueResolver != null) {
                    cron = this.embeddedValueResolver.resolveStringValue(cron);
                    zone = this.embeddedValueResolver.resolveStringValue(zone);
                }
                TimeZone timeZone;
                if (StringUtils.hasText(zone)) {
                    //解析时区
                    timeZone = StringUtils.parseTimeZoneString(zone);
                }
                else {
                    //创建默认时区
                    timeZone = TimeZone.getDefault();
                }
                //将表达式解析成cron任务计划添加到任务集合中
                //调用ScheduledTaskRegistrar对象的scheduleCronTask(CronTask cronTask)方法
                tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
            }
    
            // At this point we don't need to differentiate between initial delay set or not anymore
            if (initialDelay < 0) {
                initialDelay = 0;
            }
    
            // Check fixed delay
            //判断是否有fixedDelay属性
            long fixedDelay = scheduled.fixedDelay();
            if (fixedDelay >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                //根据属性值创建Interval任务然后添加到任务集合中
                //调用ScheduledTaskRegistrar对象的scheduleFixedDelayTask(IntervalTask task)方法
                tasks.add(this.registrar.scheduleFixedDelayTask(new IntervalTask(runnable, fixedDelay, initialDelay)));
            }
            //判断是否有fixedDelayString属性
            String fixedDelayString = scheduled.fixedDelayString();
            if (StringUtils.hasText(fixedDelayString)) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                if (this.embeddedValueResolver != null) {
                    fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
                }
                try {
                    fixedDelay = Long.parseLong(fixedDelayString);
                }
                catch (NumberFormatException ex) {
                    throw new IllegalArgumentException(
                        "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into integer");
                }
                //根据属性值创建Interval任务然后添加到任务集合中
                //调用ScheduledTaskRegistrar对象的scheduleFixedDelayTask(IntervalTask task)方法
                tasks.add(this.registrar.scheduleFixedDelayTask(new IntervalTask(runnable, fixedDelay, initialDelay)));
            }
    
            // Check fixed rate
            //判断是否有fixedRate属性
            long fixedRate = scheduled.fixedRate();
            if (fixedRate >= 0) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                //根据属性值创建Interval任务然后添加到任务集合中
                //调用ScheduledTaskRegistrar对象的scheduleFixedRateTask(IntervalTask task)方法
                tasks.add(this.registrar.scheduleFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay)));
            }
            //判断是否有fixedRateString属性
            String fixedRateString = scheduled.fixedRateString();
            if (StringUtils.hasText(fixedRateString)) {
                Assert.isTrue(!processedSchedule, errorMessage);
                processedSchedule = true;
                if (this.embeddedValueResolver != null) {
                    fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
                }
                try {
                    fixedRate = Long.parseLong(fixedRateString);
                }
                catch (NumberFormatException ex) {
                    throw new IllegalArgumentException(
                        "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into integer");
                }
                //根据属性值创建Interval任务然后添加到任务集合中
                //调用ScheduledTaskRegistrar对象的scheduleFixedRateTask(IntervalTask task)方法
                tasks.add(this.registrar.scheduleFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay)));
            }
    
            // Check whether we had any attribute set
            Assert.isTrue(processedSchedule, errorMessage);
    
            // Finally register the scheduled tasks
            synchronized (this.scheduledTasks) {
                Set<ScheduledTask> registeredTasks = this.scheduledTasks.get(bean);
                if (registeredTasks == null) {
                    registeredTasks = new LinkedHashSet<ScheduledTask>(4);
                    this.scheduledTasks.put(bean, registeredTasks);
                }
                registeredTasks.addAll(tasks);
            }
        }
        catch (IllegalArgumentException ex) {
            throw new IllegalStateException(
                "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
        }
    }
    

    上述源码中讲到了ScheduledMethodRunnable类,该类实现了Runnable接口:

    public class ScheduledMethodRunnable implements Runnable {
    
        private final Object target;
        private final Method method;
    
        //constructor and getter/setter...
    
        @Override
        public void run() {
            try {
                //定时任务的反射机制
                ReflectionUtils.makeAccessible(this.method);
                this.method.invoke(this.target);
            }
            catch (InvocationTargetException ex) {
                ReflectionUtils.rethrowRuntimeException(ex.getTargetException());
            }
            catch (IllegalAccessException ex) {
                throw new UndeclaredThrowableException(ex);
            }
        }
    }
    

    processScheduled方法中,对于每种不同属性的情况,会创建不同类型的Task。此处我们以CronTask为例,其余会在后续中提到,接下来咱们来看ScheduledTaskRegistrar#scheduleCronTask方法:

    public ScheduledTask scheduleCronTask(CronTask task) {
       ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
       boolean newTask = false;
       if (scheduledTask == null) {
          scheduledTask = new ScheduledTask();
          newTask = true;
       }
        //重点
       if (this.taskScheduler != null) {
          scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
       }
       else {
          addCronTask(task);
          this.unresolvedTasks.put(task, scheduledTask);
       }
       return (newTask ? scheduledTask : null);
    }
    

    因为AnnotationDrivenBeanDefinitionParser是基于注解形式,无法在注解上指定使用的TaskScheduler任务调度器,所以在上述源码中有taskScheduler变量,在Spring启动过程中,父接口ApplicationListener在Spring容器未完全启动之前,即在ContextRefreshedEvent事件未发布之前,该taskScheduler变量仍为null值。

    ContextRefreshedEvent事件发布后,监听到ContextRefreshedEvent事件,则会触发方法:

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (event.getApplicationContext() == this.applicationContext) {
            //完成注册后
            finishRegistration();
        }
    }
    

    ScheduledAnnotationBeanPostProcessor#finishRegistration方法:

    private void finishRegistration() {
        if (this.scheduler != null) {
            this.registrar.setScheduler(this.scheduler);
        }
    
        if (this.beanFactory instanceof ListableBeanFactory) {
            Map<String, SchedulingConfigurer> beans =
                ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
            List<SchedulingConfigurer> configurers = new ArrayList<SchedulingConfigurer>(beans.values());
            AnnotationAwareOrderComparator.sort(configurers);
            for (SchedulingConfigurer configurer : configurers) {
                configurer.configureTasks(this.registrar);
            }
        }
    
        //如果有任务,但是scheduler任务调度器为null时
        if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
            try {
                // 开始查找TaskScheduler类型的Bean
                this.registrar.setTaskScheduler(resolveSchedulerBean(TaskScheduler.class, false));
            }
            catch (NoUniqueBeanDefinitionException ex) {
                logger.debug("Could not find unique TaskScheduler bean", ex);
                try {
                    //根据名字和类来查找
                    this.registrar.setTaskScheduler(resolveSchedulerBean(TaskScheduler.class, true));
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    //logger
                }
            }
            catch (NoSuchBeanDefinitionException ex) {
                logger.debug("Could not find default TaskScheduler bean", ex);
                // Search for ScheduledExecutorService bean next...
                try {
                    // 如果没有TaskScheduler类型的Bean,则查找ScheduledExecutorService类型的Bean
                    this.registrar.setScheduler(resolveSchedulerBean(ScheduledExecutorService.class, false));
                }
                catch (NoUniqueBeanDefinitionException ex2) {
                    logger.debug("Could not find unique ScheduledExecutorService bean", ex2);
                    try {
                        //根据名字和类来查找
                        this.registrar.setScheduler(resolveSchedulerBean(ScheduledExecutorService.class, true));
                    }
                    catch (NoSuchBeanDefinitionException ex3) {
                        //logger
                    }
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    //logger
                }
            }
        }
    
        this.registrar.afterPropertiesSet();
    }
    

    从上面源码中,会从BeanFactory中试图获取TaskSchedulerScheduledExecutorService的Bean。

    public static final String DEFAULT_TASK_SCHEDULER_BEAN_NAME = "taskScheduler";
    
    private <T> T resolveSchedulerBean(Class<T> schedulerType, boolean byName) {
       if (byName) {
           //根据name和类来查找
          T scheduler = this.beanFactory.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, schedulerType);
          if (this.beanFactory instanceof ConfigurableBeanFactory) {
             ((ConfigurableBeanFactory) this.beanFactory).registerDependentBean(
                   DEFAULT_TASK_SCHEDULER_BEAN_NAME, this.beanName);
          }
          return scheduler;
       }
        //根据类来查找
       else if (this.beanFactory instanceof AutowireCapableBeanFactory) {
          NamedBeanHolder<T> holder = ((AutowireCapableBeanFactory) this.beanFactory).resolveNamedBean(schedulerType);
          if (this.beanFactory instanceof ConfigurableBeanFactory) {
             ((ConfigurableBeanFactory) this.beanFactory).registerDependentBean(
                   holder.getBeanName(), this.beanName);
          }
          return holder.getBeanInstance();
       }
       else {
          return this.beanFactory.getBean(schedulerType);
       }
    }
    

    这里就要解释下在第3.1、SchedulerBeanDefinitionParser小节留下的疑惑:

    • 如果配置了<task:scheduler id="调度器id" pool-size="线程池大小" />,就会注册一个Bean,该Bean的值为org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler类的实例,而这个类又实现了TaskScheduler接口。所以如果配置了,就能从BeanFactory中获取该Bean的值,然后使用ScheduledTaskRegistrarsetTaskScheduler方法设置TaskScheduler对象的值,此时taskScheduler就不再为null了,而是ThreadPoolTaskScheduler类型的任务调度器。
    • 如果未配置<task:scheduler id="调度器id" pool-size="线程池大小" />,则无法从BeanFactory中获取对应class或者name的Bean,此时taskScheduler仍为null。

    接着看ScheduledTaskRegistrar#afterPropertiesSet和scheduleTasks方法:

    @Override
    public void afterPropertiesSet() {
        scheduleTasks();
    }
    
    protected void scheduleTasks() {
        //判断taskScheduler是否为null,如果配置了<task:scheduler id="" pool-size=""/>标签,则taskScheduler就不为null了,直接跳过创建调度器的代码
        if (this.taskScheduler == null) {
            //则创建一个单线程的线程池,和一个ConcurrentTaskScheduler类型的任务调度器
            this.localExecutor = Executors.newSingleThreadScheduledExecutor();
            this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
        }
        //将任务添加到任务调度器中进行执行
        if (this.triggerTasks != null) {
            for (TriggerTask task : this.triggerTasks) {
                addScheduledTask(scheduleTriggerTask(task));
            }
        }
        if (this.cronTasks != null) {
            for (CronTask task : this.cronTasks) {
                addScheduledTask(scheduleCronTask(task));
            }
        }
        if (this.fixedRateTasks != null) {
            for (IntervalTask task : this.fixedRateTasks) {
                addScheduledTask(scheduleFixedRateTask(task));
            }
        }
        if (this.fixedDelayTasks != null) {
            for (IntervalTask task : this.fixedDelayTasks) {
                addScheduledTask(scheduleFixedDelayTask(task));
            }
        }
    }
    
    private void addScheduledTask(ScheduledTask task) {
        if (task != null) {
            this.scheduledTasks.add(task);
        }
    }
    

    又回归到了ScheduledTaskRegistrar#scheduleCronTask方法,在进入scheduleTasks方法之后,会先判断taskScheduler是否为null值,如果是null,则创建一个ConcurrentTaskScheduler类型的TaskScheduler;如果不为null(即ThreadPoolTaskScheduler类似),则跳过创建过程。所以在scheduleCronTask方法中的taskScheduler已经不为null了,所以可以执行调度器scheduler的schedule方法:

    public ScheduledTask scheduleCronTask(CronTask task) {
        ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);
        boolean newTask = false;
        if (scheduledTask == null) {
            scheduledTask = new ScheduledTask();
            newTask = true;
        }
        //此时taskScheduler不为null
        if (this.taskScheduler != null) {
            //执行TaskScheduler的schedule方法
            scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());
        }
        else {
            addCronTask(task);
            this.unresolvedTasks.put(task, scheduledTask);
        }
        return (newTask ? scheduledTask : null);
    }
    

    TaskScheduler是个接口,主要有ThreadPoolTaskSchedulerConcurrentTaskScheduler两个实现类。

    ThreadPoolTaskScheduler#schedule方法:

    @Override
    public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
        ScheduledExecutorService executor = getScheduledExecutor();
        try {
            ErrorHandler errorHandler =
                (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
            return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
        }
        catch (RejectedExecutionException ex) {
            throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
        }
    }
    

    再看ConcurrentTaskScheduler#schedule方法:

    @Override
    public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
        try {
            if (this.enterpriseConcurrentScheduler) {
                return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);
            }
            else {
                ErrorHandler errorHandler =
                    (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));
                //调用ReschedulingRunnable对象的schedule方法,触发run方法
                return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();
            }
        }
        catch (RejectedExecutionException ex) {
            throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);
        }
    }
    

    其实两者类似,最后都是调用了ReschedulingRunnable类的schedule方法,而schedule方法会触发Runnable接口实现类的run方法的执行,从而导致自定义的任务方法触发执行。

    来看看ReschedulingRunnable类的构造函数和执行方法:

    class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {
        //构造函数
        public ReschedulingRunnable(Runnable delegate, Trigger trigger, ScheduledExecutorService executor, ErrorHandler errorHandler) {
            super(delegate, errorHandler);
            this.trigger = trigger;
            this.executor = executor;
        }
    
        public ScheduledFuture<?> schedule() {
            synchronized (this.triggerContextMonitor) {
                //计算下一次执行时间
                this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
                if (this.scheduledExecutionTime == null) {
                    return null;
                }
                long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
                this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
                return this;
            }
        }
    
        @Override
        public void run() {
            Date actualExecutionTime = new Date();
            //调用父类的run方法
            super.run();
            Date completionTime = new Date();
            synchronized (this.triggerContextMonitor) {
                this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
                if (!this.currentFuture.isCancelled()) {
                    // 继续无限调度
                    schedule();
                }
            }
        }
    }
    

    super.run();会调用父类DelegatingErrorHandlingRunnable的run方法,父类run方法中最重要的一句:this.delegate.run();即调用Runnable的run方法,而此时Runnable对象实际上是上述ScheduledMethodRunnable对象,其中封装了自定义的任务方法。最后归根结底,就是通过一层层封装来执行自定义的任务方法。

    3.4、ScheduledTasksBeanDefinitionParser

    该类用于解析使用xml配置形式的任务调度,看源码(部分):

    public class ScheduledTasksBeanDefinitionParser extends AbstractSingleBeanDefinitionParser {
        
        @Override
        protected String getBeanClassName(Element element) {
            return "org.springframework.scheduling.config.ContextLifecycleScheduledTaskRegistrar";
        }
        
        //other code...
        @Override
        protected void doParse(Element element, ParserContext parserContext, BeanDefinitionBuilder builder) {
            builder.setLazyInit(false); // lazy scheduled tasks are a contradiction in terms -> force to false
            //创建四种集合,分别用来管理四种不同参数或者属性的Task任务
            ManagedList<RuntimeBeanReference> cronTaskList = new ManagedList<RuntimeBeanReference>();
            ManagedList<RuntimeBeanReference> fixedDelayTaskList = new ManagedList<RuntimeBeanReference>();
            ManagedList<RuntimeBeanReference> fixedRateTaskList = new ManagedList<RuntimeBeanReference>();
            ManagedList<RuntimeBeanReference> triggerTaskList = new ManagedList<RuntimeBeanReference>();
            //解析xml节点
            NodeList childNodes = element.getChildNodes();
            for (int i = 0; i < childNodes.getLength(); i++) {
                Node child = childNodes.item(i);
                //判断是否是<task:scheduled />节点
                if (!isScheduledElement(child, parserContext)) {
                    continue;
                }
                Element taskElement = (Element) child;
                //获取节点属性
                String ref = taskElement.getAttribute("ref");
                String method = taskElement.getAttribute("method");
    
                // Check that 'ref' and 'method' are specified
                if (!StringUtils.hasText(ref) || !StringUtils.hasText(method)) {
                    parserContext.getReaderContext().error("Both 'ref' and 'method' are required", taskElement);
                    // Continue with the possible next task element
                    continue;
                }
    
                //获取节点属性值
                String cronAttribute = taskElement.getAttribute("cron");
                String fixedDelayAttribute = taskElement.getAttribute("fixed-delay");
                String fixedRateAttribute = taskElement.getAttribute("fixed-rate");
                String triggerAttribute = taskElement.getAttribute("trigger");
                String initialDelayAttribute = taskElement.getAttribute("initial-delay");
    
                boolean hasCronAttribute = StringUtils.hasText(cronAttribute);
                boolean hasFixedDelayAttribute = StringUtils.hasText(fixedDelayAttribute);
                boolean hasFixedRateAttribute = StringUtils.hasText(fixedRateAttribute);
                boolean hasTriggerAttribute = StringUtils.hasText(triggerAttribute);
                boolean hasInitialDelayAttribute = StringUtils.hasText(initialDelayAttribute);
    
                if (!(hasCronAttribute || hasFixedDelayAttribute || hasFixedRateAttribute || hasTriggerAttribute)) {
                    parserContext.getReaderContext().error(
                            "one of the 'cron', 'fixed-delay', 'fixed-rate', or 'trigger' attributes is required", taskElement);
                    continue; // with the possible next task element
                }
    
                if (hasInitialDelayAttribute && (hasCronAttribute || hasTriggerAttribute)) {
                    parserContext.getReaderContext().error(
                            "the 'initial-delay' attribute may not be used with cron and trigger tasks", taskElement);
                    continue; // with the possible next task element
                }
    
                //创建Runnable对象,并返回该对象的name,见下文#1
                String runnableName =
                        runnableReference(ref, method, taskElement, parserContext).getBeanName();
    
                //以下四种类型的Task,将不同属性和Runnable对象实例绑定对应的Task类上,见下文#2以CronTask为例
                if (hasFixedDelayAttribute) {
                    fixedDelayTaskList.add(intervalTaskReference(runnableName,
                            initialDelayAttribute, fixedDelayAttribute, taskElement, parserContext));
                }
                if (hasFixedRateAttribute) {
                    fixedRateTaskList.add(intervalTaskReference(runnableName,
                            initialDelayAttribute, fixedRateAttribute, taskElement, parserContext));
                }
                if (hasCronAttribute) {
                    cronTaskList.add(cronTaskReference(runnableName, cronAttribute,
                            taskElement, parserContext));
                }
                if (hasTriggerAttribute) {
                    String triggerName = new RuntimeBeanReference(triggerAttribute).getBeanName();
                    triggerTaskList.add(triggerTaskReference(runnableName, triggerName,
                            taskElement, parserContext));
                }
            }
            String schedulerRef = element.getAttribute("scheduler");
            if (StringUtils.hasText(schedulerRef)) {
                //如果有节点上scheduler属性,则进行绑定
                builder.addPropertyReference("taskScheduler", schedulerRef);
            }
            builder.addPropertyValue("cronTasksList", cronTaskList);
            builder.addPropertyValue("fixedDelayTasksList", fixedDelayTaskList);
            builder.addPropertyValue("fixedRateTasksList", fixedRateTaskList);
            builder.addPropertyValue("triggerTasksList", triggerTaskList);
        }
        
        //other code...
    }
    

    使用ScheduledTasksBeanDefinitionParser解析器,会创建一个类型为org.springframework.scheduling.config.ContextLifecycleScheduledTaskRegistrar的Bean。

    doParse源码的#1处

    ScheduledTasksBeanDefinitionParser#runnableReference方法:

    private RuntimeBeanReference runnableReference(String ref, String method, Element taskElement, ParserContext parserContext) {
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
            "org.springframework.scheduling.support.ScheduledMethodRunnable");
        builder.addConstructorArgReference(ref);
        builder.addConstructorArgValue(method);
        return beanReference(taskElement, parserContext, builder);
    }
    

    再看看ScheduledTasksBeanDefinitionParser#beanReference方法定义:

    private RuntimeBeanReference beanReference(Element taskElement,
                ParserContext parserContext, BeanDefinitionBuilder builder) {
        // Extract the source of the current task
        builder.getRawBeanDefinition().setSource(parserContext.extractSource(taskElement));
        String generatedName = parserContext.getReaderContext().generateBeanName(builder.getRawBeanDefinition());
        parserContext.registerBeanComponent(new BeanComponentDefinition(builder.getBeanDefinition(), generatedName));
        return new RuntimeBeanReference(generatedName);
    }
    

    上述源码中可以看出,它声明了一个ScheduledMethodRunnable对象的实例,并把任务类和任务的方法绑定到该对象上,并进行注册。

    doParse源码的#2处,以CronTask为例:

    ScheduledTasksBeanDefinitionParser#cronTaskReference方法:

    private RuntimeBeanReference cronTaskReference(String runnableBeanName,
                String cronExpression, Element taskElement, ParserContext parserContext) {
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(
            "org.springframework.scheduling.config.CronTask");
        builder.addConstructorArgReference(runnableBeanName);
        builder.addConstructorArgValue(cronExpression);
        return beanReference(taskElement, parserContext, builder);
    }
    

    runnableReference方法一样,在cronTaskReference方法中,同样声明了一个CronTask对象实例,然后将Runnable对象的name以及cron表达式绑定到CronTask对象实例上,并进行注册。

    最后将所有的任务加入到不同的任务集合中,并把任务集合绑定到类型为org.springframework.scheduling.config.ContextLifecycleScheduledTaskRegistrar的Bean上。

    看看ContextLifecycleScheduledTaskRegistrar的类定义:

    public class ContextLifecycleScheduledTaskRegistrar extends ScheduledTaskRegistrar implements SmartInitializingSingleton {
    
        @Override
        public void afterPropertiesSet() {
            // no-op
        }
    
        @Override
        public void afterSingletonsInstantiated() {
            scheduleTasks();
        }
    
    }
    

    在处理ContextLifecycleScheduledTaskRegistrar的Bean的时候,会触发父类的scheduleTasks方法。

    此时又回归到了ScheduledTaskRegistrarscheduleTasks方法了。

    ScheduledTaskRegistrar#scheduleTasks方法:

    protected void scheduleTasks() {
        if (this.taskScheduler == null) {
            this.localExecutor = Executors.newSingleThreadScheduledExecutor();
            this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
        }
        if (this.triggerTasks != null) {
            for (TriggerTask task : this.triggerTasks) {
                addScheduledTask(scheduleTriggerTask(task));
            }
        }
        if (this.cronTasks != null) {
            for (CronTask task : this.cronTasks) {
                addScheduledTask(scheduleCronTask(task));
            }
        }
        if (this.fixedRateTasks != null) {
            for (IntervalTask task : this.fixedRateTasks) {
                addScheduledTask(scheduleFixedRateTask(task));
            }
        }
        if (this.fixedDelayTasks != null) {
            for (IntervalTask task : this.fixedDelayTasks) {
                addScheduledTask(scheduleFixedDelayTask(task));
            }
        }
    }
    

    ScheduledTasksBeanDefinitionParserdoParse方法中有几行代码:

    String schedulerRef = element.getAttribute("scheduler");
    if (StringUtils.hasText(schedulerRef)) {
        //如果有节点上scheduler属性,则进行绑定
        builder.addPropertyReference("taskScheduler", schedulerRef);
    }
    

    正如我注释所言:

    • 如果节点上指定了scheduler属性,则会把scheduler的引用Bean绑定到ContextLifecycleScheduledTaskRegistrar(ScheduledTaskRegistrar子类)类实例上,则父类ScheduledTaskRegistrar的taskScheduler的值就会赋值为指定的scheduler任务调度器的值,此时的taskScheduler通常为ThreadPoolTaskScheduler类型。
    • 如果节点上未指定scheduler属性,即使配置了<task:scheduler id="" pool-size="" />,也不会把当前任务绑定到之前配置的scheduler上去,因为此时taskScheduler为null,它会自己创建一个ConcurrentTaskScheduler类型的TaskScheduler,而不是实际配置的ThreadPoolTaskScheduler类型的TaskScheduler。

    最后执行的步骤跟使用注解的方式就一样了,这里就不再累赘了。请参看第3.3小节。

    4、其他

    ScheduledTaskRegistrar类中定义了根据三种不同类型的Task执行的四个方法:

    Task类型有:

    • CronTask
    • TriggerTask
    • IntervalTask

    方法:

    • ScheduledTask scheduleCronTask(CronTask task);
      • ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
      • ScheduledFuture<?> schedule(Runnable task, Date startTime);
    • ScheduledTask scheduleTriggerTask(TriggerTask task);
      • ScheduledFuture<?> schedule(Runnable task, Trigger trigger);
      • ScheduledFuture<?> schedule(Runnable task, Date startTime);
    • ScheduledTask scheduleFixedRateTask(IntervalTask task);
      • ScheduledFuture<?> scheduleAtFixedRate(Runnable task, long period);
      • ScheduledFuture<?> scheduleAtFixedRate(Runnable task, Date startTime, long period);
    • ScheduledTask scheduleFixedDelayTask(IntervalTask task);
      • ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long delay);
      • ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, Date startTime, long delay);

    其中重载方法取决于有没有设置initial-delay属性。

    对于schedule的两个重载方法,

    • 如果设置了initial-delay属性,则执行ScheduledExecutorService对象的schedule方法;
    • 如果未设置则执行ReschedulingRunnable对象的schedule方法

    对于另外四个方法,都是执行ScheduledExecutorService各自的scheduleXxx同名方法。

    5、Spring Boot中使用Schedule

    只要在主入口类上加入@EnableScheduling 即可启动任务调度了。

    @Import(SchedulingConfiguration.class)
    public @interface EnableScheduling {}
    

    其中导入了SchedulingConfiguration类。

    @Configuration
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public class SchedulingConfiguration {
    
        @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
        @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
        public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
            return new ScheduledAnnotationBeanPostProcessor();
        }
    }
    

    SchedulingConfiguration类中,创建了一个ScheduledAnnotationBeanPostProcessor的Bean。看过上文的小伙伴肯定就熟悉这个类了,在讲解使用注解的时候,在AnnotationDrivenBeanDefinitionParser解析<task:annotation-driven />的类中会创建一个ScheduledAnnotationBeanPostProcessor的Bean,后续的过程就同使用注解的方式一样了。

    相关文章

      网友评论

        本文标题:二十二、Spring Boot使用Spring Task进行任务

        本文链接:https://www.haomeiwen.com/subject/rsyzgqtx.html