美文网首页
聊聊PowerJob的TimingStrategyHandler

聊聊PowerJob的TimingStrategyHandler

作者: go4it | 来源:发表于2024-01-08 09:19 被阅读0次

    本文主要研究一下PowerJob的TimingStrategyHandler

    TimingStrategyHandler

    tech/powerjob/server/core/scheduler/auxiliary/TimingStrategyHandler.java

    public interface TimingStrategyHandler {
    
        /**
         * 校验表达式
         *
         * @param timeExpression 时间表达式
         */
        void validate(String timeExpression);
    
        /**
         * 计算下次触发时间
         *
         * @param preTriggerTime 上次触发时间 (not null)
         * @param timeExpression 时间表达式
         * @param startTime      开始时间(include)
         * @param endTime        结束时间(include)
         * @return next trigger time
         */
        Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime);
    
        /**
         * 支持的定时策略
         *
         * @return TimeExpressionType
         */
        TimeExpressionType supportType();
    
    
    }
    

    TimingStrategyHandler接口定义了validate、calculateNextTriggerTime、supportType方法

    TimeExpressionType

    tech/powerjob/common/enums/TimeExpressionType.java

    @Getter
    @AllArgsConstructor
    @ToString
    public enum TimeExpressionType {
    
        API(1),
        CRON(2),
        FIXED_RATE(3),
        FIXED_DELAY(4),
        WORKFLOW(5),
    
        DAILY_TIME_INTERVAL(11);
    
        private final int v;
    
        public static final List<Integer> FREQUENT_TYPES = Collections.unmodifiableList(Lists.newArrayList(FIXED_RATE.v, FIXED_DELAY.v));
        /**
         * 首次计算触发时间时必须计算出一个有效值
         */
        public static final List<Integer> INSPECT_TYPES =  Collections.unmodifiableList(Lists.newArrayList(CRON.v, DAILY_TIME_INTERVAL.v));
    
        public static TimeExpressionType of(int v) {
            for (TimeExpressionType type : values()) {
                if (type.v == v) {
                    return type;
                }
            }
            throw new IllegalArgumentException("unknown TimeExpressionType of " + v);
        }
    }
    

    TimeExpressionType枚举定义了API、CRON、FIXED_RATE、FIXED_DELAY、WORKFLOW、DAILY_TIME_INTERVAL几种类型

    AbstractTimingStrategyHandler

    tech/powerjob/server/core/scheduler/auxiliary/AbstractTimingStrategyHandler.java

    public abstract class AbstractTimingStrategyHandler implements TimingStrategyHandler {
        @Override
        public void validate(String timeExpression) {
            // do nothing
        }
    
        @Override
        public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
            // do nothing
            return null;
        }
    }
    

    AbstractTimingStrategyHandler实现了TimingStrategyHandler的validate、calculateNextTriggerTime方法

    ApiTimingStrategyHandler

    tech/powerjob/server/core/scheduler/auxiliary/impl/ApiTimingStrategyHandler.java

    @Component
    public class ApiTimingStrategyHandler extends AbstractTimingStrategyHandler {
        @Override
        public TimeExpressionType supportType() {
            return TimeExpressionType.API;
        }
    }
    

    ApiTimingStrategyHandler继承了AbstractTimingStrategyHandler,其supportType返回的是TimeExpressionType.API

    FixedRateTimingStrategyHandler

    tech/powerjob/server/core/scheduler/auxiliary/impl/FixedRateTimingStrategyHandler.java

    @Component
    public class FixedRateTimingStrategyHandler extends AbstractTimingStrategyHandler {
    
        @Override
        public void validate(String timeExpression) {
            long delay;
            try {
                delay = Long.parseLong(timeExpression);
            } catch (Exception e) {
                throw new PowerJobException("invalid timeExpression!");
            }
            // 默认 120s ,超过这个限制应该使用考虑使用其他类型以减少资源占用
            int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000"));
            if (delay > maxInterval) {
                throw new PowerJobException("the rate must be less than " + maxInterval + "ms");
            }
            if (delay <= 0) {
                throw new PowerJobException("the rate must be greater than 0 ms");
            }
        }
    
        @Override
        public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
            long r = startTime != null && startTime > preTriggerTime
                    ? startTime : preTriggerTime + Long.parseLong(timeExpression);
            return endTime != null && endTime < r ? null : r;
        }
    
        @Override
        public TimeExpressionType supportType() {
            return TimeExpressionType.FIXED_RATE;
        }
    }
    

    FixedRateTimingStrategyHandler继承了AbstractTimingStrategyHandler,其validate方法校验interval参数,要求大于0而且不能大于120s;calculateNextTriggerTime方法先根据startTime、preTriggerTime、timeExpression计算再与endTime做比较;其supportType返回的是TimeExpressionType.FIXED_RATE

    FixedDelayTimingStrategyHandler

    tech/powerjob/server/core/scheduler/auxiliary/impl/FixedDelayTimingStrategyHandler.java

    @Component
    public class FixedDelayTimingStrategyHandler extends AbstractTimingStrategyHandler {
    
        @Override
        public void validate(String timeExpression) {
            long delay;
            try {
                delay = Long.parseLong(timeExpression);
            } catch (Exception e) {
                throw new PowerJobException("invalid timeExpression!");
            }
            // 默认 120s ,超过这个限制应该考虑使用其他类型以减少资源占用
            int maxInterval = Integer.parseInt(System.getProperty(PowerJobDKey.FREQUENCY_JOB_MAX_INTERVAL, "120000"));
            if (delay > maxInterval) {
                throw new PowerJobException("the delay must be less than " + maxInterval + "ms");
            }
            if (delay <= 0) {
                throw new PowerJobException("the delay must be greater than 0 ms");
            }
        }
    
        @Override
        public TimeExpressionType supportType() {
            return TimeExpressionType.FIXED_DELAY;
        }
    }
    

    FixedDelayTimingStrategyHandler继承了AbstractTimingStrategyHandler,其validate要求delay大于0且小于等于120s;其supportType返回的是TimeExpressionType.FIXED_DELAY

    WorkflowTimingStrategyHandler

    tech/powerjob/server/core/scheduler/auxiliary/impl/WorkflowTimingStrategyHandler.java

    @Component
    public class WorkflowTimingStrategyHandler extends AbstractTimingStrategyHandler {
        @Override
        public TimeExpressionType supportType() {
            return TimeExpressionType.WORKFLOW;
        }
    }
    

    WorkflowTimingStrategyHandler继承了AbstractTimingStrategyHandler,其supportType返回的是TimeExpressionType.WORKFLOW

    CronTimingStrategyHandler

    tech/powerjob/server/core/scheduler/auxiliary/impl/CronTimingStrategyHandler.java

    @Component
    public class CronTimingStrategyHandler implements TimingStrategyHandler {
    
        private final CronParser cronParser;
    
        /**
         * @see CronDefinitionBuilder#instanceDefinitionFor
         * <p>
         * Enhanced quartz cron,Support for specifying both a day-of-week and a day-of-month parameter.
         * https://github.com/PowerJob/PowerJob/issues/382
         */
        public CronTimingStrategyHandler() {
            CronDefinition cronDefinition = CronDefinitionBuilder.defineCron()
                    .withSeconds().withValidRange(0, 59).and()
                    .withMinutes().withValidRange(0, 59).and()
                    .withHours().withValidRange(0, 23).and()
                    .withDayOfMonth().withValidRange(1, 31).supportsL().supportsW().supportsLW().supportsQuestionMark().and()
                    .withMonth().withValidRange(1, 12).and()
                    .withDayOfWeek().withValidRange(1, 7).withMondayDoWValue(2).supportsHash().supportsL().supportsQuestionMark().and()
                    .withYear().withValidRange(1970, 2099).withStrictRange().optional().and()
                    .instance();
            this.cronParser = new CronParser(cronDefinition);
        }
    
    
        @Override
        public void validate(String timeExpression) {
            cronParser.parse(timeExpression);
        }
    
        @Override
        public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
            Cron cron = cronParser.parse(timeExpression);
            ExecutionTime executionTime = ExecutionTime.forCron(cron);
            if (startTime != null && startTime > System.currentTimeMillis() && preTriggerTime < startTime) {
                // 需要计算出离 startTime 最近的一次真正的触发时间
                Optional<ZonedDateTime> zonedDateTime = executionTime.lastExecution(ZonedDateTime.ofInstant(Instant.ofEpochMilli(startTime), ZoneId.systemDefault()));
                preTriggerTime = zonedDateTime.map(dateTime -> dateTime.toEpochSecond() * 1000).orElse(startTime);
            }
            Instant instant = Instant.ofEpochMilli(preTriggerTime);
            ZonedDateTime preZonedDateTime = ZonedDateTime.ofInstant(instant, ZoneId.systemDefault());
            Optional<ZonedDateTime> opt = executionTime.nextExecution(preZonedDateTime);
            if (opt.isPresent()) {
                long nextTriggerTime = opt.get().toEpochSecond() * 1000;
                if (endTime != null && endTime < nextTriggerTime) {
                    return null;
                }
                return nextTriggerTime;
            }
            return null;
        }
    
        @Override
        public TimeExpressionType supportType() {
            return TimeExpressionType.CRON;
        }
    }
    

    CronTimingStrategyHandler实现了TimingStrategyHandler接口,其构造器先创建了CronDefinition,再根据CronDefinition创建CronParser;其validate方法调用了cronParser.parse(timeExpression);calculateNextTriggerTime方法先解析timeExpression,再解析为ExecutionTime,再根据startTime和preTriggerTime计算新的preTriggerTime;最后通过executionTime.nextExecution计算nextTriggerTime;其supportType返回的是TimeExpressionType.CRON

    DailyTimeIntervalStrategyHandler

    tech/powerjob/server/core/scheduler/auxiliary/impl/DailyTimeIntervalStrategyHandler.java

    @Component
    public class DailyTimeIntervalStrategyHandler implements TimingStrategyHandler {
    
        /**
         * 使用中国星期!!!
         */
        private static final Set<Integer> ALL_DAY = Sets.newHashSet(1, 2, 3, 4, 5, 6, 7);
    
        @Override
        public TimeExpressionType supportType() {
            return TimeExpressionType.DAILY_TIME_INTERVAL;
        }
    
        @Override
        @SneakyThrows
        public void validate(String timeExpression) {
            DailyTimeIntervalExpress ep = JsonUtils.parseObject(timeExpression, DailyTimeIntervalExpress.class);
            CommonUtils.requireNonNull(ep.interval, "interval can't be null or empty in DailyTimeIntervalExpress");
            CommonUtils.requireNonNull(ep.startTimeOfDay, "startTimeOfDay can't be null or empty in DailyTimeIntervalExpress");
            CommonUtils.requireNonNull(ep.endTimeOfDay, "endTimeOfDay can't be null or empty in DailyTimeIntervalExpress");
    
            TimeOfDay startTime = TimeOfDay.from(ep.startTimeOfDay);
            TimeOfDay endTime = TimeOfDay.from(ep.endTimeOfDay);
    
            if (endTime.before(startTime)) {
                throw new IllegalArgumentException("endTime should after startTime!");
            }
    
            if (StringUtils.isNotEmpty(ep.intervalUnit)) {
                TimeUnit.valueOf(ep.intervalUnit);
            }
        }
    
        @Override
        @SneakyThrows
        public Long calculateNextTriggerTime(Long preTriggerTime, String timeExpression, Long startTime, Long endTime) {
            DailyTimeIntervalExpress ep = JsonUtils.parseObject(timeExpression, DailyTimeIntervalExpress.class);
    
            // 未开始状态下,用起点算调度时间
            if (startTime != null && startTime > System.currentTimeMillis() && preTriggerTime < startTime) {
                return calculateInRangeTime(startTime, ep);
            }
    
            // 间隔时间
            TimeUnit timeUnit = Optional.ofNullable(ep.intervalUnit).map(TimeUnit::valueOf).orElse(TimeUnit.SECONDS);
            long interval = timeUnit.toMillis(ep.interval);
    
            Long ret = calculateInRangeTime(preTriggerTime + interval, ep);
            if (ret == null || ret <= Optional.ofNullable(endTime).orElse(Long.MAX_VALUE)) {
                return ret;
            }
            return null;
        }
    
        /**
         * 计算最近一次在范围中的时间
         * @param time 当前时间基准,可能直接返回该时间作为结果
         * @param ep 表达式
         * @return 最近一次在范围中的时间
         */
        static Long calculateInRangeTime(Long time, DailyTimeIntervalExpress ep) {
    
            Calendar calendar = Calendar.getInstance();
            calendar.setTime(new Date(time));
    
            int year = calendar.get(Calendar.YEAR);
            // 月份 + 1,转为熟悉的 1~12 月
            int month = calendar.get(Calendar.MONTH) + 1;
            int day = calendar.get(Calendar.DAY_OF_MONTH);
    
            // 判断是否符合"日"的执行条件
            int week = TimeUtils.calculateWeek(year, month, day);
            Set<Integer> targetDays = CollectionUtils.isEmpty(ep.daysOfWeek) ? ALL_DAY : ep.daysOfWeek;
            // 未包含情况下,将时间改写为符合条件日的 00:00 分,重新开始递归(这部分应该有性能更优的写法,不过这个调度模式应该很难触发瓶颈,先简单好用的实现)
            if (!targetDays.contains(week)) {
                simpleSetCalendar(calendar, 0, 0, 0);
                Date tomorrowZero = DateUtils.addDays(calendar.getTime(), 1);
                return calculateInRangeTime(tomorrowZero.getTime(), ep);
            }
    
            // 范围的开始时间
            TimeOfDay rangeStartTime = TimeOfDay.from(ep.startTimeOfDay);
            simpleSetCalendar(calendar, rangeStartTime.getHour(), rangeStartTime.getMinute(), rangeStartTime.getSecond());
            long todayStartTs = calendar.getTimeInMillis();
    
            // 未开始
            if (time < todayStartTs) {
                return todayStartTs;
            }
    
            TimeOfDay rangeEndTime = TimeOfDay.from(ep.endTimeOfDay);
            simpleSetCalendar(calendar, rangeEndTime.getHour(), rangeEndTime.getMinute(), rangeEndTime.getSecond());
            long todayEndTs = calendar.getTimeInMillis();
    
            // 范围之间
            if (time <= todayEndTs) {
                return time;
            }
    
            // 已结束,重新计算第二天时间
            simpleSetCalendar(calendar, 0, 0, 0);
            return calculateInRangeTime(DateUtils.addDays(calendar.getTime(), 1).getTime(), ep);
        }
    
        //......
    }    
    

    DailyTimeIntervalStrategyHandler实现了TimingStrategyHandler接口,其supportType返回的是TimeExpressionType.DAILY_TIME_INTERVAL;其validate方法先解析参数为DailyTimeIntervalExpress,然后校验其endTime不能比startTime小;其calculateNextTriggerTime方法主要是通过calculateInRangeTime来计算最近一次在范围中的时间

    TimingStrategyService

    tech/powerjob/server/core/scheduler/TimingStrategyService.java

    @Slf4j
    @Service
    public class TimingStrategyService {
    
        private static final int NEXT_N_TIMES = 5;
    
        private static final List<String> TIPS = Collections.singletonList("It is valid, but has not trigger time list!");
    
    
        private final Map<TimeExpressionType, TimingStrategyHandler> strategyContainer;
    
        public TimingStrategyService(List<TimingStrategyHandler> timingStrategyHandlers) {
            // init
            strategyContainer = new EnumMap<>(TimeExpressionType.class);
            for (TimingStrategyHandler timingStrategyHandler : timingStrategyHandlers) {
                strategyContainer.put(timingStrategyHandler.supportType(), timingStrategyHandler);
            }
        }
    
        /**
         * 计算接下来几次的调度时间
         *
         * @param timeExpressionType 定时表达式类型
         * @param timeExpression     表达式
         * @param startTime          起始时间(include)
         * @param endTime            结束时间(include)
         * @return 调度时间列表
         */
        public List<String> calculateNextTriggerTimes(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
    
            TimingStrategyHandler timingStrategyHandler = getHandler(timeExpressionType);
            List<Long> triggerTimeList = new ArrayList<>(NEXT_N_TIMES);
            Long nextTriggerTime = System.currentTimeMillis();
            do {
                nextTriggerTime = timingStrategyHandler.calculateNextTriggerTime(nextTriggerTime, timeExpression, startTime, endTime);
                if (nextTriggerTime == null) {
                    break;
                }
                triggerTimeList.add(nextTriggerTime);
            } while (triggerTimeList.size() < NEXT_N_TIMES);
    
            if (triggerTimeList.isEmpty()) {
                return TIPS;
            }
            return triggerTimeList.stream().map(t -> DateFormatUtils.format(t, OmsConstant.TIME_PATTERN)).collect(Collectors.toList());
        }
    
        /**
         * 计算下次的调度时间
         *
         * @param preTriggerTime     上次触发时间(nullable)
         * @param timeExpressionType 定时表达式类型
         * @param timeExpression     表达式
         * @param startTime          起始时间(include)
         * @param endTime            结束时间(include)
         * @return 下次的调度时间
         */
        public Long calculateNextTriggerTime(Long preTriggerTime, TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
            if (preTriggerTime == null || preTriggerTime < System.currentTimeMillis()) {
                preTriggerTime = System.currentTimeMillis();
            }
            return getHandler(timeExpressionType).calculateNextTriggerTime(preTriggerTime, timeExpression, startTime, endTime);
        }
    
    
        /**
         * 计算下次的调度时间并检查校验规则
         *
         * @param timeExpressionType 定时表达式类型
         * @param timeExpression     表达式
         * @param startTime          起始时间(include)
         * @param endTime            结束时间(include)
         * @return 下次的调度时间
         */
        public Long calculateNextTriggerTimeWithInspection( TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
            Long nextTriggerTime = calculateNextTriggerTime(null, timeExpressionType, timeExpression, startTime, endTime);
            if (TimeExpressionType.INSPECT_TYPES.contains(timeExpressionType.getV()) && nextTriggerTime == null) {
                throw new PowerJobException("time expression is out of date: " + timeExpression);
            }
            return nextTriggerTime;
        }
    
    
        public void validate(TimeExpressionType timeExpressionType, String timeExpression, Long startTime, Long endTime) {
            if (endTime != null) {
                if (endTime <= System.currentTimeMillis()) {
                    throw new PowerJobException("lifecycle is out of date!");
                }
                if (startTime != null && startTime > endTime) {
                    throw new PowerJobException("lifecycle is invalid! start time must earlier then end time.");
                }
            }
            getHandler(timeExpressionType).validate(timeExpression);
        }
    
    
        private TimingStrategyHandler getHandler(TimeExpressionType timeExpressionType) {
            TimingStrategyHandler timingStrategyHandler = strategyContainer.get(timeExpressionType);
            if (timingStrategyHandler == null) {
                throw new PowerJobException("No matching TimingStrategyHandler for this TimeExpressionType:" + timeExpressionType);
            }
            return timingStrategyHandler;
        }
    
    }
    

    TimingStrategyService的构造器遍历timingStrategyHandlers,然后根据其supportType构建Map<TimeExpressionType, TimingStrategyHandler>;其calculateNextTriggerTimes先根据timeExpressionType获取到对应的TimingStrategyHandler,再循环调用TimingStrategyHandler.calculateNextTriggerTime方法来计算nextTriggerTime,最后返回最近5次的调度时间;calculateNextTriggerTimeWithInspection方法会计算nextTriggerTime并针对CRON及DAILY_TIME_INTERVAL类型的要求其不能为null;validate方法调用的是对应TimingStrategyHandler的validate方法

    小结

    TimingStrategyHandler接口定义了validate、calculateNextTriggerTime、supportType方法;其支持的TimeExpressionType枚举定义了API、CRON、FIXED_RATE、FIXED_DELAY、WORKFLOW、DAILY_TIME_INTERVAL几种类型,分别对应了ApiTimingStrategyHandler、CronTimingStrategyHandler、FixedRateTimingStrategyHandler、FixedDelayTimingStrategyHandler、WorkflowTimingStrategyHandler、DailyTimeIntervalStrategyHandler;TimingStrategyService则聚合了这些TimingStrategyHandler,对外提供了calculateNextTriggerTimes、calculateNextTriggerTime、calculateNextTriggerTimeWithInspection、validate方法。

    相关文章

      网友评论

          本文标题:聊聊PowerJob的TimingStrategyHandler

          本文链接:https://www.haomeiwen.com/subject/rumnndtx.html