美文网首页
定时任务原理以及自定义定时任务的管理

定时任务原理以及自定义定时任务的管理

作者: guessguess | 来源:发表于2024-05-05 16:47 被阅读0次

    概述

    由于一些定时任务调度,可能会存在调整的问题。
    本文主要是讲述一下定时任务的实现原理。
    

    定时任务的用法

    @Component
    public class TestTask {
    
        @Scheduled(cron = "0/1 * * * * ?")
        public void cron() {
            System.out.println("nothing to do ");
        }
        
        @Scheduled(fixedDelay = 1000)
        public void fixedDelay() {
            System.out.println("nothing to do ");
        }
        
        @Scheduled(fixedRate = 1000)
        public void fixedRate() {
            System.out.println("nothing to do ");
        }
    }
    
    主要分为三种。cron以及fixedDelay,fixRate。
    这三者的区别吧。
    cron其实是定时执行-到某个点马上执行,当然这里是线程资源足够的前提。
    fixedDelay,固定延迟
    fixRate,固定频率延迟,如果没有执行完,下次任务会马上执行。网上说会有并发问题。我阅读源码没有发现这个问题。因为所有下次的执行都是依赖上一次执行完毕的。
    

    spring如何实现定时任务原理

    通过导入配置类

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Import(SchedulingConfiguration.class)
    @Documented
    public @interface EnableScheduling {
    
    }
    

    配置类做了什么?

    其实配置类比较简单。通过注入了一个内置的后置处理器。
    这个后置处理器主要是在bean初始化时,做了一些操作。
    @Configuration(proxyBeanMethods = false)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public class SchedulingConfiguration {
    
        @Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
        @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
        public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
            return new ScheduledAnnotationBeanPostProcessor();
        }
    
    }
    
    后置处理器做了什么呢?很简单。
    bean初始化的时候进行了扫描.
    针对方法的Schedule注解进行扫描。
    
    public class ScheduledAnnotationBeanPostProcessor
            implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
            Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
            SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
    
        @Override
        public Object postProcessAfterInitialization(Object bean, String beanName) {
            if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
                    bean instanceof ScheduledExecutorService) {
                // Ignore AOP infrastructure such as scoped proxies.
                return bean;
            }
    
            Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
            if (!this.nonAnnotatedClasses.contains(targetClass) &&
                    AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
                Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                        (MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
                            Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
                                    method, Scheduled.class, Schedules.class);
                            return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
                        });
                if (annotatedMethods.isEmpty()) {
                    this.nonAnnotatedClasses.add(targetClass);
                    if (logger.isTraceEnabled()) {
                        logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
                    }
                }
                else {
                    // Non-empty set of methods
                    annotatedMethods.forEach((method, scheduledAnnotations) ->
                            scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
                    if (logger.isTraceEnabled()) {
                        logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
                                "': " + annotatedMethods);
                    }
                }
            }
            return bean;
        }
    }
    
    扫描后做什么呢?
    那就是进行注册,源码如下.其实就是利用了registrar进行注册。
        protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
            try {
                Runnable runnable = createRunnable(bean, method);
                boolean processedSchedule = false;
                String errorMessage =
                        "Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
    
                Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
    
                // Determine initial delay
                long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());
                String initialDelayString = scheduled.initialDelayString();
                if (StringUtils.hasText(initialDelayString)) {
                    Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
                    if (this.embeddedValueResolver != null) {
                        initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
                    }
                    if (StringUtils.hasLength(initialDelayString)) {
                        try {
                            initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit());
                        }
                        catch (RuntimeException ex) {
                            throw new IllegalArgumentException(
                                    "Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
                        }
                    }
                }
    
                // Check cron expression
                String cron = scheduled.cron();
                if (StringUtils.hasText(cron)) {
                    String zone = scheduled.zone();
                    if (this.embeddedValueResolver != null) {
                        cron = this.embeddedValueResolver.resolveStringValue(cron);
                        zone = this.embeddedValueResolver.resolveStringValue(zone);
                    }
                    if (StringUtils.hasLength(cron)) {
                        Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
                        processedSchedule = true;
                        if (!Scheduled.CRON_DISABLED.equals(cron)) {
                            TimeZone timeZone;
                            if (StringUtils.hasText(zone)) {
                                timeZone = StringUtils.parseTimeZoneString(zone);
                            }
                            else {
                                timeZone = TimeZone.getDefault();
                            }
                            tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
                        }
                    }
                }
    
                // At this point we don't need to differentiate between initial delay set or not anymore
                if (initialDelay < 0) {
                    initialDelay = 0;
                }
    
                // Check fixed delay
                long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());
                if (fixedDelay >= 0) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
                }
    
                String fixedDelayString = scheduled.fixedDelayString();
                if (StringUtils.hasText(fixedDelayString)) {
                    if (this.embeddedValueResolver != null) {
                        fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
                    }
                    if (StringUtils.hasLength(fixedDelayString)) {
                        Assert.isTrue(!processedSchedule, errorMessage);
                        processedSchedule = true;
                        try {
                            fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit());
                        }
                        catch (RuntimeException ex) {
                            throw new IllegalArgumentException(
                                    "Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
                        }
                        tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
                    }
                }
    
                // Check fixed rate
                long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());
                if (fixedRate >= 0) {
                    Assert.isTrue(!processedSchedule, errorMessage);
                    processedSchedule = true;
                    tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
                }
                String fixedRateString = scheduled.fixedRateString();
                if (StringUtils.hasText(fixedRateString)) {
                    if (this.embeddedValueResolver != null) {
                        fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
                    }
                    if (StringUtils.hasLength(fixedRateString)) {
                        Assert.isTrue(!processedSchedule, errorMessage);
                        processedSchedule = true;
                        try {
                            fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit());
                        }
                        catch (RuntimeException ex) {
                            throw new IllegalArgumentException(
                                    "Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
                        }
                        tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
                    }
                }
    
                // Check whether we had any attribute set
                Assert.isTrue(processedSchedule, errorMessage);
    
                // Finally register the scheduled tasks
                synchronized (this.scheduledTasks) {
                    Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
                    regTasks.addAll(tasks);
                }
            }
            catch (IllegalArgumentException ex) {
                throw new IllegalStateException(
                        "Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
            }
        }
    

    那为什么注册之后,任务就可以运行了

    其实在注册之后,便会将任务,放入线程池。等待被执行。
    class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {
        @Nullable
        public ScheduledFuture<?> schedule() {
            synchronized (this.triggerContextMonitor) {
                            //通过触发器拿到下次执行任务的时间。
                this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
                if (this.scheduledExecutionTime == null) {
                    return null;
                }
                            //计算出延迟的时间,放入队列。
                long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();
                this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
                return this;
            }
        }
    }
    

    那么如何定时被执行呢?

    class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {
        @Override
        public void run() {
            Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());
                    //执行任务代码
            super.run();
            Date completionTime = new Date(this.triggerContext.getClock().millis());
            synchronized (this.triggerContextMonitor) {
                Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
                this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
                if (!obtainCurrentFuture().isCancelled()) {
                                    //执行完如果任务没有被取消。开始套娃。
                    schedule();
                }
            }
        }
    }
    

    fixDelay&fixRate

    其实二者的实现也是一样的,套娃调度。
    public class ScheduledThreadPoolExecutor
            extends ThreadPoolExecutor
            implements ScheduledExecutorService {
    
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (period <= 0)
                throw new IllegalArgumentException();
            //构建成需要的任务结构
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(period));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            //延时执行
            delayedExecute(t);
            return t;
        }
    
        private class ScheduledFutureTask<V>
                extends FutureTask<V> implements RunnableScheduledFuture<V> {
    
            public void run() {
                boolean periodic = isPeriodic();
                if (!canRunInCurrentRunState(periodic))
                    cancel(false);
                else if (!periodic)
                    ScheduledFutureTask.super.run();
                else if (ScheduledFutureTask.super.runAndReset()) {
                    //执行完设置下次的执行时间
                    setNextRunTime();
                    //将任务放入队列
                    reExecutePeriodic(outerTask);
                }
            }
    }
    
    因此目前得知。
    其实spring也没有将定时任务转成特定的bean.
    因此我们只要能够机械能任务的注册即可。
    另外则是,定时任务每次的执行,都是需要上一次完成才可以。所以不存在所谓的并发问题。
    另外则是下一次的执行时间
    

    脚手架中如何实现定时任务

    @ConditionalOnBean(value = ScheduleConfiguration.class)
    @Configuration
    public class TaskConfiguration implements SchedulingConfigurer{
        
        //用于保存定时任务注册器
        private static ScheduledTaskRegistrar scheduledTaskRegistrar;
    
        private static Map<String, ScheduledTask> TASK_MAP = new HashMap<String, ScheduledTask>();
    
        public static Map<String, ScheduledTask> getTASK_MAP() {
            return TASK_MAP;
        }
        
        public static ScheduledTaskRegistrar getScheduledTaskRegistrar() {
            return scheduledTaskRegistrar;
        }
    
        @Override
        public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
            TaskConfiguration.scheduledTaskRegistrar = taskRegistrar;
        }
    }
    
    
    
    比较简单。实现接口,并且注入容器即可
    首先这里有2个接口
    第一个接口则是用于cron表达式类型的定时任务。
    public abstract class CronTask implements Task{.
            //获取cron表达式,默认从容器中获取
        public String getCron() {
            return ApplicationContextUtils.getPropertyStrValue(getCronKey());
        }
            //设置任务名称
        public abstract String getName();
        //设置cron表达式的key
        public abstract String getCronKey();
    }
    
    
    第二个接口则是用于延迟类型的定时任务。
    public abstract class FixDelayTask implements Task{
        public abstract String getFixDelayKey();
        public abstract String getInitialDelayKey();
        //获取任务的初始延时间隔
        public long getInitialDelay() {
            return ApplicationContextUtils.getPropertyValue(getInitialDelayKey(), Long.class);
        };
        //获取任务的延时间隔
        public long getFixedDelay() {
            return ApplicationContextUtils.getPropertyValue(getFixDelayKey(), Long.class);
        }
        //是否固定频率。如果为true则为fixRate, false则为fixDelay
        public abstract boolean isFixedRate();
    }
    
    
    下面是几个demo.
    
    @Component
    public class CustomCronTask extends CronTask{
        @Override
        public void run() {
            System.out.println(this.getName() + "nothing to do");
        }
    
        @Override
        public String getName() {
            return "cron";
        }
    
        @Override
        public String getCronKey() {
            return "custom.task.alarm.cron";
        }
    }
    
    @Component
    public class CustomFixDelayTask extends FixDelayTask{
    
        @Override
        public String getName() {
            return "delay";
        }
    
        @Override
        public void run() {
            System.out.println(this.getName() + "nothing to do");
        }
    
        @Override
        public String getFixDelayKey() {
            return "custom.task.alarm.fixdelay";
        }
    
        @Override
        public String getInitialDelayKey() {
            return "custom.task.alarm.fix-initdelay";
        }
    
        @Override
        public boolean isFixedRate() {
            return false;
        }
    
    }
    
    @Component
    public class CustomFixRateTask extends FixDelayTask{
    
        @Override
        public String getName() {
            return "rate";
        }
    
        @Override
        public void run() {
            System.out.println(this.getName() + "nothing to do");
        }
    
        @Override
        public String getFixDelayKey() {
            return "custom.task.alarm.fixrate";
        }
    
        @Override
        public String getInitialDelayKey() {
            return "custom.task.alarm.fixrate-initdelay";
        }
    
        @Override
        public boolean isFixedRate() {
            return true;
        }
    }
    

    脚手架如何实现对定时任务的管理

    其实定时任务的核心点在于,获取下次任务的执行时间。
    spring的定时任务本身,提供了触发器可以去解决这个问题。
    因此我们的核心点在于,去重新实现对应的触发器。
    目前支持的触发器类型有俩种。
    一种是定时触发器CronTrigger,另外一种则是周期触发器PeriodicTrigger.
    
    
    覆写的源码如下。
    public class CustomCronTrigger extends CronTrigger{
        
        public CustomCronTrigger(CronTask cronTask) {
            //其实只是初始化时有点用,后面没用。统一从容器获取
            super(cronTask.getCron());
            this.cronTask = cronTask;
        }
    
        private CronTask cronTask;
    
        @Override
        public Date nextExecutionTime(TriggerContext triggerContext) {
            Date date = triggerContext.lastCompletionTime();
            if (date != null) {
                Date scheduled = triggerContext.lastScheduledExecutionTime();
                if (scheduled != null && date.before(scheduled)) {
                    date = scheduled;
                }
            }
            else {
                date = new Date(triggerContext.getClock().millis());
            }
            ZonedDateTime dateTime = ZonedDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault());
            //由于cron只会初始化一次。因此,改为从容器中读取。主要是支持配置的动态刷新
            ZonedDateTime next = CronExpression.parse(this.getExpression()).next(dateTime);
            return (next != null ? Date.from(next.toInstant()) : null); 
        }
    
        @Override
        public String getExpression() {
            return cronTask.getCron();
        }
    }
    
    public class CustomFixDelayTrigger extends PeriodicTrigger{
        private FixDelayTask ft;
        public CustomFixDelayTrigger(FixDelayTask ft) {
            super(ft.getFixedDelay());
            this.ft = ft;
        }
    
        @Override
        public long getPeriod() {
            return ft.getFixedDelay();
        }
    
        @Override
        public long getInitialDelay() {
            return ft.getInitialDelay();
        }
    
        
    
        @Override
        public boolean isFixedRate() {
            return ft.isFixedRate();
        }
    
        @Override
        public Date nextExecutionTime(TriggerContext triggerContext) {
            Date lastExecution = triggerContext.lastScheduledExecutionTime();
            Date lastCompletion = triggerContext.lastCompletionTime();
            if (lastExecution == null || lastCompletion == null) {
                return new Date(triggerContext.getClock().millis() + this.getInitialDelay());
            }
            if (this.isFixedRate()) {
                return new Date(lastExecution.getTime() + this.getPeriod());
            }
            return new Date(lastCompletion.getTime() + this.getPeriod());
        }
    }
    

    脚手架-定时任务的注册

    时机比较简单。针对服务启动时即可。
    因此这里使用哨兵对服务进行初始化
    @Component
    public class ContextStandGuard  implements ApplicationRunner, DisposableBean {
    
        @Autowired
        private TaskService taskService;
        
        @Override
        public void destroy() {
            TaskConfiguration.getTASK_MAP().values().forEach(task -> {
                task.cancel(true);
            });
            System.out.println("容器摧毁时,顺便摧毁定时任务");
        }
    
        @Override
        public void run(ApplicationArguments args) throws Exception {
            System.out.println("容器启动");
            taskService.init();
        }
    }
    

    脚手架-定时任务的注册实现

    原理比较简单。
    从容器中获取bean。
    然后构建对应的触发器即可。
    
    @Component
    public class TaskServiceImpl implements TaskService{
        
        @Override
        public void init() {
            List<Task> tasks = ApplicationContextUtils.getBeanList(Task.class);
            if (CollectionUtils.isEmpty(tasks)) {
                return;
            }
            for (Task task : tasks) {
                TriggerTask triggerTask = getTriggerTask(task);
                if(triggerTask == null) {
                    continue;
                }
                ScheduledTask st = TaskConfiguration.getScheduledTaskRegistrar().scheduleTriggerTask(triggerTask);
                TaskConfiguration.getTASK_MAP().put(task.getName(), st);
            }
        }
    
        private TriggerTask getTriggerTask(Task task) {
            Trigger tg = genTrigger(task);
            if(tg == null) {
                return null;
            }
            return new TriggerTask(task, tg);
        }
        
        private Trigger genTrigger(Task task) {
            if(CronTask.class.isAssignableFrom(task.getClass())) {
                CronTask ct = (CronTask)task;
                return new CustomCronTrigger(ct);
            }else if(FixDelayTask.class.isAssignableFrom(task.getClass())) {
                FixDelayTask ft = (FixDelayTask)task;
                return new CustomFixDelayTrigger(ft);
            }
            return null;
        }
    
        @Override
        public boolean destory(String taskname) {
            TaskConfiguration.getTASK_MAP().get(taskname).cancel();
            return true;
        }
    
        @Override
        public boolean register(String taskname) {
            return true;
        }
    }
    
    

    脚手架-定时任务的取消

    这里首先回到源码。
    如果一个任务正在执行,这会被取消了,会有什么影响吗?
    没有任何影响,但是下次任务就没办法执行了。
    
    class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {
    
        @Override
        public void run() {
            Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());
            super.run();
            Date completionTime = new Date(this.triggerContext.getClock().millis());
            synchronized (this.triggerContextMonitor) {
                Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
                this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
                            从源码来看-只要isCancelled为true就不会执行了,那么定时任务就会套娃失败。任务就被移除了。
                if (!obtainCurrentFuture().isCancelled()) {
                    schedule();
                }
            }
        }
    
        @Override
        public boolean isCancelled() {
            synchronized (this.triggerContextMonitor) {
                return obtainCurrentFuture().isCancelled();
            }
        }
    
    }
    
    所以脚手架对于定时任务的取消。也比较简单了。就是直接简单的移除即可
        @Override
        public boolean destory(String taskname) {
            TaskConfiguration.getTASK_MAP().get(taskname).cancel();
            return true;
        }
    
    

    相关文章

      网友评论

          本文标题:定时任务原理以及自定义定时任务的管理

          本文链接:https://www.haomeiwen.com/subject/gsbkfjtx.html