概述
由于一些定时任务调度,可能会存在调整的问题。
本文主要是讲述一下定时任务的实现原理。
定时任务的用法
@Component
public class TestTask {
@Scheduled(cron = "0/1 * * * * ?")
public void cron() {
System.out.println("nothing to do ");
}
@Scheduled(fixedDelay = 1000)
public void fixedDelay() {
System.out.println("nothing to do ");
}
@Scheduled(fixedRate = 1000)
public void fixedRate() {
System.out.println("nothing to do ");
}
}
主要分为三种。cron以及fixedDelay,fixRate。
这三者的区别吧。
cron其实是定时执行-到某个点马上执行,当然这里是线程资源足够的前提。
fixedDelay,固定延迟
fixRate,固定频率延迟,如果没有执行完,下次任务会马上执行。网上说会有并发问题。我阅读源码没有发现这个问题。因为所有下次的执行都是依赖上一次执行完毕的。
spring如何实现定时任务原理
通过导入配置类
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(SchedulingConfiguration.class)
@Documented
public @interface EnableScheduling {
}
配置类做了什么?
其实配置类比较简单。通过注入了一个内置的后置处理器。
这个后置处理器主要是在bean初始化时,做了一些操作。
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class SchedulingConfiguration {
@Bean(name = TaskManagementConfigUtils.SCHEDULED_ANNOTATION_PROCESSOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public ScheduledAnnotationBeanPostProcessor scheduledAnnotationProcessor() {
return new ScheduledAnnotationBeanPostProcessor();
}
}
后置处理器做了什么呢?很简单。
bean初始化的时候进行了扫描.
针对方法的Schedule注解进行扫描。
public class ScheduledAnnotationBeanPostProcessor
implements ScheduledTaskHolder, MergedBeanDefinitionPostProcessor, DestructionAwareBeanPostProcessor,
Ordered, EmbeddedValueResolverAware, BeanNameAware, BeanFactoryAware, ApplicationContextAware,
SmartInitializingSingleton, ApplicationListener<ContextRefreshedEvent>, DisposableBean {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) {
if (bean instanceof AopInfrastructureBean || bean instanceof TaskScheduler ||
bean instanceof ScheduledExecutorService) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass) &&
AnnotationUtils.isCandidateClass(targetClass, Arrays.asList(Scheduled.class, Schedules.class))) {
Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<Scheduled>>) method -> {
Set<Scheduled> scheduledAnnotations = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, Scheduled.class, Schedules.class);
return (!scheduledAnnotations.isEmpty() ? scheduledAnnotations : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @Scheduled annotations found on bean class: " + targetClass);
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, scheduledAnnotations) ->
scheduledAnnotations.forEach(scheduled -> processScheduled(scheduled, method, bean)));
if (logger.isTraceEnabled()) {
logger.trace(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
}
扫描后做什么呢?
那就是进行注册,源码如下.其实就是利用了registrar进行注册。
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {
try {
Runnable runnable = createRunnable(bean, method);
boolean processedSchedule = false;
String errorMessage =
"Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";
Set<ScheduledTask> tasks = new LinkedHashSet<>(4);
// Determine initial delay
long initialDelay = convertToMillis(scheduled.initialDelay(), scheduled.timeUnit());
String initialDelayString = scheduled.initialDelayString();
if (StringUtils.hasText(initialDelayString)) {
Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");
if (this.embeddedValueResolver != null) {
initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);
}
if (StringUtils.hasLength(initialDelayString)) {
try {
initialDelay = convertToMillis(initialDelayString, scheduled.timeUnit());
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into long");
}
}
}
// Check cron expression
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {
String zone = scheduled.zone();
if (this.embeddedValueResolver != null) {
cron = this.embeddedValueResolver.resolveStringValue(cron);
zone = this.embeddedValueResolver.resolveStringValue(zone);
}
if (StringUtils.hasLength(cron)) {
Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");
processedSchedule = true;
if (!Scheduled.CRON_DISABLED.equals(cron)) {
TimeZone timeZone;
if (StringUtils.hasText(zone)) {
timeZone = StringUtils.parseTimeZoneString(zone);
}
else {
timeZone = TimeZone.getDefault();
}
tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
}
}
// At this point we don't need to differentiate between initial delay set or not anymore
if (initialDelay < 0) {
initialDelay = 0;
}
// Check fixed delay
long fixedDelay = convertToMillis(scheduled.fixedDelay(), scheduled.timeUnit());
if (fixedDelay >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
String fixedDelayString = scheduled.fixedDelayString();
if (StringUtils.hasText(fixedDelayString)) {
if (this.embeddedValueResolver != null) {
fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);
}
if (StringUtils.hasLength(fixedDelayString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedDelay = convertToMillis(fixedDelayString, scheduled.timeUnit());
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedDelayTask(new FixedDelayTask(runnable, fixedDelay, initialDelay)));
}
}
// Check fixed rate
long fixedRate = convertToMillis(scheduled.fixedRate(), scheduled.timeUnit());
if (fixedRate >= 0) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
String fixedRateString = scheduled.fixedRateString();
if (StringUtils.hasText(fixedRateString)) {
if (this.embeddedValueResolver != null) {
fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);
}
if (StringUtils.hasLength(fixedRateString)) {
Assert.isTrue(!processedSchedule, errorMessage);
processedSchedule = true;
try {
fixedRate = convertToMillis(fixedRateString, scheduled.timeUnit());
}
catch (RuntimeException ex) {
throw new IllegalArgumentException(
"Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into long");
}
tasks.add(this.registrar.scheduleFixedRateTask(new FixedRateTask(runnable, fixedRate, initialDelay)));
}
}
// Check whether we had any attribute set
Assert.isTrue(processedSchedule, errorMessage);
// Finally register the scheduled tasks
synchronized (this.scheduledTasks) {
Set<ScheduledTask> regTasks = this.scheduledTasks.computeIfAbsent(bean, key -> new LinkedHashSet<>(4));
regTasks.addAll(tasks);
}
}
catch (IllegalArgumentException ex) {
throw new IllegalStateException(
"Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());
}
}
那为什么注册之后,任务就可以运行了
其实在注册之后,便会将任务,放入线程池。等待被执行。
class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {
@Nullable
public ScheduledFuture<?> schedule() {
synchronized (this.triggerContextMonitor) {
//通过触发器拿到下次执行任务的时间。
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
if (this.scheduledExecutionTime == null) {
return null;
}
//计算出延迟的时间,放入队列。
long initialDelay = this.scheduledExecutionTime.getTime() - this.triggerContext.getClock().millis();
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}
}
那么如何定时被执行呢?
class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {
@Override
public void run() {
Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());
//执行任务代码
super.run();
Date completionTime = new Date(this.triggerContext.getClock().millis());
synchronized (this.triggerContextMonitor) {
Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
if (!obtainCurrentFuture().isCancelled()) {
//执行完如果任务没有被取消。开始套娃。
schedule();
}
}
}
}
fixDelay&fixRate
其实二者的实现也是一样的,套娃调度。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//构建成需要的任务结构
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//延时执行
delayedExecute(t);
return t;
}
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
//执行完设置下次的执行时间
setNextRunTime();
//将任务放入队列
reExecutePeriodic(outerTask);
}
}
}
因此目前得知。
其实spring也没有将定时任务转成特定的bean.
因此我们只要能够机械能任务的注册即可。
另外则是,定时任务每次的执行,都是需要上一次完成才可以。所以不存在所谓的并发问题。
另外则是下一次的执行时间
脚手架中如何实现定时任务
@ConditionalOnBean(value = ScheduleConfiguration.class)
@Configuration
public class TaskConfiguration implements SchedulingConfigurer{
//用于保存定时任务注册器
private static ScheduledTaskRegistrar scheduledTaskRegistrar;
private static Map<String, ScheduledTask> TASK_MAP = new HashMap<String, ScheduledTask>();
public static Map<String, ScheduledTask> getTASK_MAP() {
return TASK_MAP;
}
public static ScheduledTaskRegistrar getScheduledTaskRegistrar() {
return scheduledTaskRegistrar;
}
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
TaskConfiguration.scheduledTaskRegistrar = taskRegistrar;
}
}
比较简单。实现接口,并且注入容器即可
首先这里有2个接口
第一个接口则是用于cron表达式类型的定时任务。
public abstract class CronTask implements Task{.
//获取cron表达式,默认从容器中获取
public String getCron() {
return ApplicationContextUtils.getPropertyStrValue(getCronKey());
}
//设置任务名称
public abstract String getName();
//设置cron表达式的key
public abstract String getCronKey();
}
第二个接口则是用于延迟类型的定时任务。
public abstract class FixDelayTask implements Task{
public abstract String getFixDelayKey();
public abstract String getInitialDelayKey();
//获取任务的初始延时间隔
public long getInitialDelay() {
return ApplicationContextUtils.getPropertyValue(getInitialDelayKey(), Long.class);
};
//获取任务的延时间隔
public long getFixedDelay() {
return ApplicationContextUtils.getPropertyValue(getFixDelayKey(), Long.class);
}
//是否固定频率。如果为true则为fixRate, false则为fixDelay
public abstract boolean isFixedRate();
}
下面是几个demo.
@Component
public class CustomCronTask extends CronTask{
@Override
public void run() {
System.out.println(this.getName() + "nothing to do");
}
@Override
public String getName() {
return "cron";
}
@Override
public String getCronKey() {
return "custom.task.alarm.cron";
}
}
@Component
public class CustomFixDelayTask extends FixDelayTask{
@Override
public String getName() {
return "delay";
}
@Override
public void run() {
System.out.println(this.getName() + "nothing to do");
}
@Override
public String getFixDelayKey() {
return "custom.task.alarm.fixdelay";
}
@Override
public String getInitialDelayKey() {
return "custom.task.alarm.fix-initdelay";
}
@Override
public boolean isFixedRate() {
return false;
}
}
@Component
public class CustomFixRateTask extends FixDelayTask{
@Override
public String getName() {
return "rate";
}
@Override
public void run() {
System.out.println(this.getName() + "nothing to do");
}
@Override
public String getFixDelayKey() {
return "custom.task.alarm.fixrate";
}
@Override
public String getInitialDelayKey() {
return "custom.task.alarm.fixrate-initdelay";
}
@Override
public boolean isFixedRate() {
return true;
}
}
脚手架如何实现对定时任务的管理
其实定时任务的核心点在于,获取下次任务的执行时间。
spring的定时任务本身,提供了触发器可以去解决这个问题。
因此我们的核心点在于,去重新实现对应的触发器。
目前支持的触发器类型有俩种。
一种是定时触发器CronTrigger,另外一种则是周期触发器PeriodicTrigger.
覆写的源码如下。
public class CustomCronTrigger extends CronTrigger{
public CustomCronTrigger(CronTask cronTask) {
//其实只是初始化时有点用,后面没用。统一从容器获取
super(cronTask.getCron());
this.cronTask = cronTask;
}
private CronTask cronTask;
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
Date date = triggerContext.lastCompletionTime();
if (date != null) {
Date scheduled = triggerContext.lastScheduledExecutionTime();
if (scheduled != null && date.before(scheduled)) {
date = scheduled;
}
}
else {
date = new Date(triggerContext.getClock().millis());
}
ZonedDateTime dateTime = ZonedDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault());
//由于cron只会初始化一次。因此,改为从容器中读取。主要是支持配置的动态刷新
ZonedDateTime next = CronExpression.parse(this.getExpression()).next(dateTime);
return (next != null ? Date.from(next.toInstant()) : null);
}
@Override
public String getExpression() {
return cronTask.getCron();
}
}
public class CustomFixDelayTrigger extends PeriodicTrigger{
private FixDelayTask ft;
public CustomFixDelayTrigger(FixDelayTask ft) {
super(ft.getFixedDelay());
this.ft = ft;
}
@Override
public long getPeriod() {
return ft.getFixedDelay();
}
@Override
public long getInitialDelay() {
return ft.getInitialDelay();
}
@Override
public boolean isFixedRate() {
return ft.isFixedRate();
}
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
Date lastExecution = triggerContext.lastScheduledExecutionTime();
Date lastCompletion = triggerContext.lastCompletionTime();
if (lastExecution == null || lastCompletion == null) {
return new Date(triggerContext.getClock().millis() + this.getInitialDelay());
}
if (this.isFixedRate()) {
return new Date(lastExecution.getTime() + this.getPeriod());
}
return new Date(lastCompletion.getTime() + this.getPeriod());
}
}
脚手架-定时任务的注册
时机比较简单。针对服务启动时即可。
因此这里使用哨兵对服务进行初始化
@Component
public class ContextStandGuard implements ApplicationRunner, DisposableBean {
@Autowired
private TaskService taskService;
@Override
public void destroy() {
TaskConfiguration.getTASK_MAP().values().forEach(task -> {
task.cancel(true);
});
System.out.println("容器摧毁时,顺便摧毁定时任务");
}
@Override
public void run(ApplicationArguments args) throws Exception {
System.out.println("容器启动");
taskService.init();
}
}
脚手架-定时任务的注册实现
原理比较简单。
从容器中获取bean。
然后构建对应的触发器即可。
@Component
public class TaskServiceImpl implements TaskService{
@Override
public void init() {
List<Task> tasks = ApplicationContextUtils.getBeanList(Task.class);
if (CollectionUtils.isEmpty(tasks)) {
return;
}
for (Task task : tasks) {
TriggerTask triggerTask = getTriggerTask(task);
if(triggerTask == null) {
continue;
}
ScheduledTask st = TaskConfiguration.getScheduledTaskRegistrar().scheduleTriggerTask(triggerTask);
TaskConfiguration.getTASK_MAP().put(task.getName(), st);
}
}
private TriggerTask getTriggerTask(Task task) {
Trigger tg = genTrigger(task);
if(tg == null) {
return null;
}
return new TriggerTask(task, tg);
}
private Trigger genTrigger(Task task) {
if(CronTask.class.isAssignableFrom(task.getClass())) {
CronTask ct = (CronTask)task;
return new CustomCronTrigger(ct);
}else if(FixDelayTask.class.isAssignableFrom(task.getClass())) {
FixDelayTask ft = (FixDelayTask)task;
return new CustomFixDelayTrigger(ft);
}
return null;
}
@Override
public boolean destory(String taskname) {
TaskConfiguration.getTASK_MAP().get(taskname).cancel();
return true;
}
@Override
public boolean register(String taskname) {
return true;
}
}
脚手架-定时任务的取消
这里首先回到源码。
如果一个任务正在执行,这会被取消了,会有什么影响吗?
没有任何影响,但是下次任务就没办法执行了。
class ReschedulingRunnable extends DelegatingErrorHandlingRunnable implements ScheduledFuture<Object> {
@Override
public void run() {
Date actualExecutionTime = new Date(this.triggerContext.getClock().millis());
super.run();
Date completionTime = new Date(this.triggerContext.getClock().millis());
synchronized (this.triggerContextMonitor) {
Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
从源码来看-只要isCancelled为true就不会执行了,那么定时任务就会套娃失败。任务就被移除了。
if (!obtainCurrentFuture().isCancelled()) {
schedule();
}
}
}
@Override
public boolean isCancelled() {
synchronized (this.triggerContextMonitor) {
return obtainCurrentFuture().isCancelled();
}
}
}
所以脚手架对于定时任务的取消。也比较简单了。就是直接简单的移除即可
@Override
public boolean destory(String taskname) {
TaskConfiguration.getTASK_MAP().get(taskname).cancel();
return true;
}
网友评论