美文网首页
SpringBoot中的定时任务详解

SpringBoot中的定时任务详解

作者: Yan雪杉 | 来源:发表于2021-11-16 11:25 被阅读0次

    在大多数项目应该不可避免会用到定时任务了,如果是单体项目的话,要实现一个定时任务还是比较简单的,可以通过Executors.newScheduledThreadPool(10)来实现,也可以通过SpringBootScheduled注解来实现。如果是分布式项目或者微服务的话,要实现一个定时任务就比较麻烦了,或者自己去实现,或者使用第三方的分布式定时任务框架,比如QuartzElastic-jobxxl-job等。

    在我们的几个项目中都会用到定时任务,而且用得也都比较频繁,在微服务项目中使用的是xxl-job,在单体项目中,由于SpringBoot自带了定时任务的实现,但是默认的实现不是很友好,加上我们对于定时任务的管理要比较灵活,可以自由地对定时任务进行增删改查,所以我们就利用Executors.newScheduledThreadPool(10)来实现了。

    首先,我们还是来看一下SpringBoot中的定时任务Scheduled是如何实现的。

    SpringBoot项目中,如果想要实现定时任务的话,首先需要在启动类上添加@EnableScheduling注解,然后在定时任务的方法上添加上@Scheduled注解,这样一个简单的定时任务就实现了。

    @EnableScheduling

    这个注解是SpringBoot项目实现定时任务的关键,我们首先来观察一下它的内部实现,点进去这个注解可以发现@Import(SchedulingConfiguration.class),可以看到它会导入一个叫做SchedulingConfiguration的配置类。

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Import(SchedulingConfiguration.class)
    @Documented
    public @interface EnableScheduling {
    
    }
    

    再点进去的话,就可以发现这个配置类做的事情非常简单,就是new出了一个ScheduledAnnotationBeanPostProcessor对象,这个对象就是实现定时任务的关键。

    @Configuration
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public class SchedulingConfiguration {
    
        @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
        @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
        public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
            return new ScheduledAnnotationBeanPostProcessor();
        }
    
    }
    

    我们可以看下ScheduledAnnotationBeanPostProcessor的实现定义,发现它还是实现了非常多的接口的,其中有一个接口是MergedBeanDefinitionPostProcessor接口,而这个接口又继承了BeanPostProcessor接口,BeanPostProcessor这个接口有两个方法需要去实现,分别为postProcessBeforeInitializationpostProcessAfterInitialization方法,分别在bean的初始化前和初始化后调用。

    那么我们就来关注一下postProcessAfterInitialization方法的实现,这个方法其实就是去扫描被@Scheduled注解标记的定时任务,当扫描到之后,会对每个定时任务调用processScheduled方法,而processScheduled方法就是对@Scheduled注解中的参数进行解析,比如fixedDelaycron等等,解析完成之后再把它添加到定时任务的集合中。

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) {
        if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
                bean instanceof ScheduledExecutorService) {
            // Ignore AOP infrastructure such as scoped proxies.
            return bean;
        }
    
        Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
        if (!this.nonAnnotatedClasses.contains(targetClass) &&
                AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
            Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                    (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                        Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                method, Scheduled.class, Schedules.class);
                        return (!scheduledMethods.isEmpty() ? scheduledMethods : null);
                    });
            if (annotatedMethods.isEmpty()) {
                this.nonAnnotatedClasses.add(targetClass);
                if (logger.isTraceEnabled()) {
                    logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
                }
            }
            else {
                // Non-empty set of methods
                annotatedMethods.forEach((method, scheduledMethods) ->
                        scheduledMethods.forEach(scheduled -> processScheduled(scheduled, method, bean)));
                if (logger.isTraceEnabled()) {
                    logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                            "': " + annotatedMethods);
                }
            }
        }
        return bean;
    }
    

    除了上述的接口以外,还有一个接口是ApplicationListener<ContextRefreshedEvent>,它会去监听ContextRefreshedEvent事件,当所有的bean都初始化完成并且装载完成的话,就会触发该事件,实现了这个接口的类就可以监听到这个事件,从而去实现自己的逻辑,这个接口只有一个方法定义onApplicationEvent(E event),所以当监听到ContextRefreshedEvent事件的时候,就会执行onApplicationEvent方法。

    public class ScheduledAnnotationBeanPostProcessor
        implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
        Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
        SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {}
    

    onApplicationEvent方法里面做的事也非常简单,就是调用内部的一个方法finishRegistrationfinishRegistraion方法的逻辑就比较复杂了,我们一一来看下

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        if (event.getApplicationContext() == this.applicationContext) {
            // Running in an ApplicationContext -> register tasks this late...
            // giving other ContextRefreshedEvent listeners a chance to perform
            // their work at the same time (e.g. Spring Batch's job registration).
            finishRegistration();
        }
    }
    
    private void finishRegistration() {
        // scheduler可以自己去实现,这个scheduler就是执行定时任务的线程池,可以自己去实现TaskScheduler,也就是使用jdk自带的ScheduledExecutorService
        // 具体可以看下setScheduler这个方法
        if (this.scheduler != null) {
            this.registrar.setScheduler(this.scheduler);
        }
    
        // 查找SchedulingConfigurer配置类,然后加载配置,这个配置类也可以自己去实现,在这个配置类中也可以去指定定时任务的线程池
        if (this.beanFactory instanceof ListableBeanFactory) {
            Map<String, SchedulingConfigurer> beans =
                    ((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);
            List<SchedulingConfigurer> configurers = new ArrayList<>(beans.values());
            AnnotationAwareOrderComparator.sort(configurers);
            for (SchedulingConfigurer configurer : configurers) {
                configurer.configureTasks(this.registrar);
            }
        }
    
        // 这个registrar中就保存了被@Scheduled注解标注的定时任务集合,之后会讲到如何从其中获取定时任务集合,并且进行任务的取消
        // 如果存在被@Scheduled注解标记的定时任务,但是scheduler为null的话,就会尝试去搜索TaskScheduler,没有找到的话就抛出异常
        if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");
            try {
                // Search for TaskScheduler bean...
                this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, false));
            }
            catch (NoUniqueBeanDefinitionException ex) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Could not find unique TaskScheduler bean - attempting to resolve by name: " +
                            ex.getMessage());
                }
                try {
                    this.registrar.setTaskScheduler(resolveSchedulerBean(this.beanFactory, TaskScheduler.class, true));
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    if (logger.isInfoEnabled()) {
                        logger.info("More than one TaskScheduler bean exists within the context, and " +
                                "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                                "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                ex.getBeanNamesFound());
                    }
                }
            }
            catch (NoSuchBeanDefinitionException ex) {
                if (logger.isTraceEnabled()) {
                    logger.trace("Could not find default TaskScheduler bean - attempting to find ScheduledExecutorService: " +
                            ex.getMessage());
                }
                // Search for ScheduledExecutorService bean next...
                try {
                    this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, false));
                }
                catch (NoUniqueBeanDefinitionException ex2) {
                    if (logger.isTraceEnabled()) {
                        logger.trace("Could not find unique ScheduledExecutorService bean - attempting to resolve by name: " +
                                ex2.getMessage());
                    }
                    try {
                        this.registrar.setScheduler(resolveSchedulerBean(this.beanFactory, ScheduledExecutorService.class, true));
                    }
                    catch (NoSuchBeanDefinitionException ex3) {
                        if (logger.isInfoEnabled()) {
                            logger.info("More than one ScheduledExecutorService bean exists within the context, and " +
                                    "none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +
                                    "(possibly as an alias); or implement the SchedulingConfigurer interface and call " +
                                    "ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +
                                    ex2.getBeanNamesFound());
                        }
                    }
                }
                catch (NoSuchBeanDefinitionException ex2) {
                    if (logger.isTraceEnabled()) {
                        logger.trace("Could not find default ScheduledExecutorService bean - falling back to default: " +
                                ex2.getMessage());
                    }
                    // Giving up -> falling back to default scheduler within the registrar...
                    logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");
                }
            }
        }
    
        // 最后会执行这个方法
        this.registrar.afterPropertiesSet();
    }
    
    @Override
    public void afterPropertiesSet() {
        scheduleTasks();
    }
    
    protected void scheduleTasks() {
    
        // 在这个方法里面,可以发现,如果taskScheduler不存在的话,就会创建出一个执行器,这个执行器应该不陌生了
        // 它就是一个corePoolSize为单线程,maxPoolSize为Integer.MAX_VALUE,队列为DelayedWorkQueue的执行器
        // 当存在很多个定时任务同时执行的时候,只会有一个定时任务被执行,其他的定时任务会被扔进DelayedWorkQueue队列中
        if (this.taskScheduler == null) {
            this.localExecutor = Executors.newSingleThreadScheduledExecutor();
            this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
        }
        // 下面的这几个判断就是将被@Scheduled注解标记的定时任务添加到任务集合中
        if (this.triggerTasks != null) {
            for (TriggerTask task : this.triggerTasks) {
                addScheduledTask(scheduleTriggerTask(task));
            }
        }
        // 注意以下这个cron表达式的定时任务添加,后续我们去实现动态地对定时任务进行管理会用到
        if (this.cronTasks != null) {
            for (CronTask task : this.cronTasks) {
                // 这里的scheduleCronTask还是值得关注的
                addScheduledTask(scheduleCronTask(task));
            }
        }
        if (this.fixedRateTasks != null) {
            for (IntervalTask task : this.fixedRateTasks) {
                addScheduledTask(scheduleFixedRateTask(task));
            }
        }
        if (this.fixedDelayTasks != null) {
            for (IntervalTask task : this.fixedDelayTasks) {
                addScheduledTask(scheduleFixedDelayTask(task));
            }
        }
    }
    

    到这里呢,被@Scheduled注解标记的方法就会被作为定时任务添加到定时任务集合中了。

    从上面我们可以发现,对于默认的定时任务的实现,执行定时任务的线程池并不是很友好,我们可以去自定义实现执行定时任务的线程池,可以去实现TaskScheduler,也可以去创建ScheduledExecutorService,还可以去实现配置类SchedulingConfigurer

    @Configuration
    public class TestConfig {
    
        @Bean
        public TaskScheduler taskScheduler() {
            ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
            taskScheduler.setPoolSize(10);
            taskScheduler.setRemoveOnCancelPolicy(false);
            taskScheduler.initialize();
            return taskScheduler;
        }
    }
    

    如何获取定义的定时任务集合

    在之前的描述中,我们可以发现在服务启动的时候,IOC容器中会注入一个ScheduledAnnotationBeanPostProcessor的Bean对象,这个Bean对象就是来对定时任务进行管理的,那么我们就可以从这个类中获取到定时任务的集合,并且将定时任务都打印出来看一下内容都是什么,可以发现ScheduledTasktoString()方法就是定时任务的全类名加上方法名,比如com.yan.shiyue.Task.task,这样的话,我们就可以将这些定时任务给保存起来,作为一个Map,key就是定时任务的名字,value就是ScheduledTask,然后我们就可以动态地对这些任务进行取消了,因为ScheduledTask提供了一个cancel方法来取消定时任务的执行。

    @Slf4j
    @Component
    public class ScheduledTaskConfig implements CommandLineRunner {
    
        @Autowired
        private ScheduledAnnotationBeanPostProcessor scheduledAnnotationBeanPostProcessor;
    
        @Override
        public void run(String... args) {
            Set<ScheduledTask> tasks = scheduledAnnotationBeanPostProcessor.getScheduledTasks();
            for (ScheduledTask task : tasks) {
                log.error(task.toString());
            }
        }
    }
    

    如何动态地创建定时任务

    我们可以发现SpringBoot提供的定时任务并不是很灵活,我们没法动态地对定时任务进行增删改查,那么基于SpringBoot的定时任务的实现,我们可以自己来实现定时任务的动态操作。

    在接下来的操作中,就以cron表达式类型的定时任务进行动态地增删改查,在实现之前我们回顾一下SpringBoot中的cron表达式类型的定时任务时如何被添加到任务集合中的。

    protected void scheduleTasks() {
    
        // 在这个方法里面,可以发现,如果taskScheduler不存在的话,就会创建出一个执行器,这个执行器应该不陌生了
        // 它就是一个corePoolSize为单线程,maxPoolSize为Integer.MAX_VALUE,队列为DelayedWorkQueue的执行器
        // 当存在很多个定时任务同时执行的时候,只会有一个定时任务被执行,其他的定时任务会被扔进DelayedWorkQueue队列中
        if (this.taskScheduler == null) {
            this.localExecutor = Executors.newSingleThreadScheduledExecutor();
            this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);
        }
        // 下面的这几个判断就是将被@Scheduled注解标记的定时任务添加到任务集合中
        if (this.triggerTasks != null) {
            for (TriggerTask task : this.triggerTasks) {
                addScheduledTask(scheduleTriggerTask(task));
            }
        }
        // 注意以下这个cron表达式的定时任务添加,后续我们去实现动态地对定时任务进行管理会用到
        if (this.cronTasks != null) {
            for (CronTask task : this.cronTasks) {
                // 这里的scheduleCronTask还是值得关注的
                addScheduledTask(scheduleCronTask(task));
            }
        }
        if (this.fixedRateTasks != null) {
            for (IntervalTask task : this.fixedRateTasks) {
                addScheduledTask(scheduleFixedRateTask(task));
            }
        }
        if (this.fixedDelayTasks != null) {
            for (IntervalTask task : this.fixedDelayTasks) {
                addScheduledTask(scheduleFixedDelayTask(task));
            }
        }
    }
    

    可以发现,SpringBoot对几种定时任务都实现了对应的Task,比如cron表达式类型的CronTask,固定频率类型的IntervalTask等等,那么我们如果要动态地添加一个cron表达式类型的定时任务的话,就可以实现CronTask了。

    那么,我们自己创建好一个CronTask之后该如何执行呢,之前有提到过SpringBoot执行定时任务的执行器可以自定义,那么我们在自定义好执行器TaskScheduler之后,就可以调用其中的schedule方法来执行定时任务了。

    首先,我们需要创建好一个任务,需要实现Runnable接口。

    public class TestTask implements Runnable {
        
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + "shiyue");
        }
    }
    

    然后,我们可以去实现一个接口,来动态地管理这个定时任务。

    @RestController
    public class TestController {
    
        @Autowired
        private TaskScheduler taskScheduler;
    
        @Autowired
        private ScheduledAnnotationBeanPostProcessor scheduledAnnotationBeanPostProcessor;
    
    
        private final Map<Integer, ScheduledFuture> taskMap = new ConcurrentHashMap<>();
    
        /**
         * 添加一个定时任务
         *
         * @return
         */
        @GetMapping("/task")
        public String addTask() {
            // 这里为了方便,cron表达式写死了,其实可以由外部传入
            CronTask cronTask = new CronTask(new TestTask(), "*/5 * * * * ?");
            ScheduledFuture scheduledFuture = taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
            // 同时,这里也是为了方便,使用Map来保存定时任务的信息,其实可以将定时任务持久化到MySQL中
            taskMap.put(1, scheduledFuture);
            return "shiyue";
        }
    
        /**
         * 更新一个定时任务,更新一个定时任务可以看做是将原来的定时给取消掉,然后新增一个新的定时任务
         *
         * @return
         */
        @GetMapping("/task/update/{id}")
        public String updateTask(@PathVariable Integer id, @RequestParam String cron) {
            ScheduledFuture scheduledFuture = taskMap.get(id);
            scheduledFuture.cancel(true);
    
            // 添加
            CronTask cronTask = new CronTask(new TestTask(), cron);
            ScheduledFuture scheduledFuture1 = taskScheduler.schedule(cronTask.getRunnable(), cronTask.getTrigger());
            taskMap.put(id, scheduledFuture1);
            return "Success";
        }
    
        @GetMapping("/task/list")
        public String taskList() {
            Set<ScheduledTask> tasks = scheduledAnnotationBeanPostProcessor.getScheduledTasks();
            for (ScheduledTask task : tasks) {
                System.out.println(task);
            }
            return "qiyue";
        }
    }
    

    相关文章

      网友评论

          本文标题:SpringBoot中的定时任务详解

          本文链接:https://www.haomeiwen.com/subject/tkyctrtx.html