美文网首页
聊聊PowerJob的任务调度

聊聊PowerJob的任务调度

作者: go4it | 来源:发表于2024-02-08 20:56 被阅读0次

    本文主要研究一下PowerJob的任务调度

    CoreScheduleTaskManager

    tech/powerjob/server/core/scheduler/CoreScheduleTaskManager.java

    @Service
    @Slf4j
    @RequiredArgsConstructor
    public class CoreScheduleTaskManager implements InitializingBean, DisposableBean {
    
    
        private final PowerScheduleService powerScheduleService;
    
        private final InstanceStatusCheckService instanceStatusCheckService;
    
        private final List<Thread> coreThreadContainer = new ArrayList<>();
    
    
        @SuppressWarnings("AlibabaAvoidManuallyCreateThread")
        @Override
        public void afterPropertiesSet() {
            // 定时调度
            coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.CRON)), "Thread-ScheduleCronJob"));
            coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleDailyTimeIntervalJob", PowerScheduleService.SCHEDULE_RATE, () -> powerScheduleService.scheduleNormalJob(TimeExpressionType.DAILY_TIME_INTERVAL)), "Thread-ScheduleDailyTimeIntervalJob"));
            coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleCronWorkflow", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleCronWorkflow), "Thread-ScheduleCronWorkflow"));
            coreThreadContainer.add(new Thread(new LoopRunnable("ScheduleFrequentJob", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::scheduleFrequentJob), "Thread-ScheduleFrequentJob"));
            // 数据清理
            coreThreadContainer.add(new Thread(new LoopRunnable("CleanWorkerData", PowerScheduleService.SCHEDULE_RATE, powerScheduleService::cleanData), "Thread-CleanWorkerData"));
            // 状态检查
            coreThreadContainer.add(new Thread(new LoopRunnable("CheckRunningInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkRunningInstance), "Thread-CheckRunningInstance"));
            coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingDispatchInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingDispatchInstance), "Thread-CheckWaitingDispatchInstance"));
            coreThreadContainer.add(new Thread(new LoopRunnable("CheckWaitingWorkerReceiveInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWaitingWorkerReceiveInstance), "Thread-CheckWaitingWorkerReceiveInstance"));
            coreThreadContainer.add(new Thread(new LoopRunnable("CheckWorkflowInstance", InstanceStatusCheckService.CHECK_INTERVAL, instanceStatusCheckService::checkWorkflowInstance), "Thread-CheckWorkflowInstance"));
    
            coreThreadContainer.forEach(Thread::start);
        }
    
        //......
    }    
    

    CoreScheduleTaskManager在afterPropertiesSet的时候会启动一系列的线程,它们都是LoopRunnable类型的,分别调度powerScheduleService.scheduleNormalJob(TimeExpressionType.CRON)、powerScheduleService.scheduleNormalJob(TimeExpressionType.DAILY_TIME_INTERVAL)、powerScheduleService::scheduleCronWorkflow、powerScheduleService::scheduleFrequentJob、powerScheduleService::cleanData、instanceStatusCheckService::checkRunningInstance、instanceStatusCheckService::checkWaitingDispatchInstance、instanceStatusCheckService::checkWaitingWorkerReceiveInstance、instanceStatusCheckService::checkWorkflowInstance

    LoopRunnable

        @RequiredArgsConstructor
        private static class LoopRunnable implements Runnable {
    
            private final String taskName;
    
            private final Long runningInterval;
    
            private final Runnable innerRunnable;
    
            @SuppressWarnings("BusyWait")
            @Override
            public void run() {
                log.info("start task : {}.", taskName);
                while (true) {
                    try {
                        innerRunnable.run();
                        Thread.sleep(runningInterval);
                    } catch (InterruptedException e) {
                        log.warn("[{}] task has been interrupted!", taskName, e);
                        break;
                    } catch (Exception e) {
                        log.error("[{}] task failed!", taskName, e);
                    }
                }
            }
        }
    

    LoopRunnable的构造器接收taskName、runningInterval、innerRunnable三个参数,其run方法通过while true循环内部执行innerRunnable.run(),执行完sleep指定的runningInterval,若捕获到InterruptedException则break跳出循环,若其他异常则打印error日志

    PowerScheduleService

    PowerScheduleService主要提供了scheduleNormalJob、scheduleCronWorkflow、scheduleFrequentJob、cleanData方法

    scheduleNormalJob

    tech/powerjob/server/core/scheduler/PowerScheduleService.java

        public void scheduleNormalJob(TimeExpressionType timeExpressionType) {
            long start = System.currentTimeMillis();
            // 调度 CRON 表达式 JOB
            try {
                final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
                if (CollectionUtils.isEmpty(allAppIds)) {
                    log.info("[NormalScheduler] current server has no app's job to schedule.");
                    return;
                }
                scheduleNormalJob0(timeExpressionType, allAppIds);
            } catch (Exception e) {
                log.error("[NormalScheduler] schedule cron job failed.", e);
            }
            long cost = System.currentTimeMillis() - start;
            log.info("[NormalScheduler] {} job schedule use {} ms.", timeExpressionType, cost);
            if (cost > SCHEDULE_RATE) {
                log.warn("[NormalScheduler] The database query is using too much time({}ms), please check if the database load is too high!", cost);
            }
        }
    

    scheduleNormalJob方法主要是查询当前server负责的appId列表,然后内部委托改为scheduleNormalJob0

    scheduleNormalJob0

        private void scheduleNormalJob0(TimeExpressionType timeExpressionType, List<Long> appIds) {
    
            long nowTime = System.currentTimeMillis();
            long timeThreshold = nowTime + 2 * SCHEDULE_RATE;
            Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {
    
                try {
    
                    // 查询条件:任务开启 + 使用CRON表达调度时间 + 指定appId + 即将需要调度执行
                    List<JobInfoDO> jobInfos = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeAndNextTriggerTimeLessThanEqual(partAppIds, SwitchableStatus.ENABLE.getV(), timeExpressionType.getV(), timeThreshold);
    
                    if (CollectionUtils.isEmpty(jobInfos)) {
                        return;
                    }
    
                    // 1. 批量写日志表
                    Map<Long, Long> jobId2InstanceId = Maps.newHashMap();
                    log.info("[NormalScheduler] These {} jobs will be scheduled: {}.", timeExpressionType.name(), jobInfos);
    
                    jobInfos.forEach(jobInfo -> {
                        Long instanceId = instanceService.create(jobInfo.getId(), jobInfo.getAppId(), jobInfo.getJobParams(), null, null, jobInfo.getNextTriggerTime()).getInstanceId();
                        jobId2InstanceId.put(jobInfo.getId(), instanceId);
                    });
                    instanceInfoRepository.flush();
    
                    // 2. 推入时间轮中等待调度执行
                    jobInfos.forEach(jobInfoDO -> {
    
                        Long instanceId = jobId2InstanceId.get(jobInfoDO.getId());
    
                        long targetTriggerTime = jobInfoDO.getNextTriggerTime();
                        long delay = 0;
                        if (targetTriggerTime < nowTime) {
                            log.warn("[Job-{}] schedule delay, expect: {}, current: {}", jobInfoDO.getId(), targetTriggerTime, System.currentTimeMillis());
                        } else {
                            delay = targetTriggerTime - nowTime;
                        }
    
                        InstanceTimeWheelService.schedule(instanceId, delay, () -> dispatchService.dispatch(jobInfoDO, instanceId, Optional.empty(), Optional.empty()));
                    });
    
                    // 3. 计算下一次调度时间(忽略5S内的重复执行,即CRON模式下最小的连续执行间隔为 SCHEDULE_RATE ms)
                    jobInfos.forEach(jobInfoDO -> {
                        try {
                            refreshJob(timeExpressionType, jobInfoDO);
                        } catch (Exception e) {
                            log.error("[Job-{}] refresh job failed.", jobInfoDO.getId(), e);
                        }
                    });
                    jobInfoRepository.flush();
    
    
                } catch (Exception e) {
                    log.error("[NormalScheduler] schedule {} job failed.", timeExpressionType.name(), e);
                }
            });
        }
    

    scheduleNormalJob0主要是调度CRON、DAILY_TIME_INTERVAL类型的任务,它通过jobInfoRepository查找指定appId、状态启用、指定TimeExpressionType,以及NextTriggerTime小于等于nowTime + 2 * SCHEDULE_RATE的任务,然后挨个执行instanceService.create创建任务实例,然后放入到InstanceTimeWheelService.schedule进行调度,最后计算和更新一下每个job的nextTriggerTime

    scheduleCronWorkflow

        public void scheduleCronWorkflow() {
            long start = System.currentTimeMillis();
            // 调度 CRON 表达式 WORKFLOW
            try {
                final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
                if (CollectionUtils.isEmpty(allAppIds)) {
                    log.info("[CronWorkflowSchedule] current server has no app's workflow to schedule.");
                    return;
                }
                scheduleWorkflowCore(allAppIds);
            } catch (Exception e) {
                log.error("[CronWorkflowSchedule] schedule cron workflow failed.", e);
            }
            long cost = System.currentTimeMillis() - start;
            log.info("[CronWorkflowSchedule] cron workflow schedule use {} ms.", cost);
            if (cost > SCHEDULE_RATE) {
                log.warn("[CronWorkflowSchedule] The database query is using too much time({}ms), please check if the database load is too high!", cost);
            }
        }
    

    scheduleCronWorkflow主要是调度CRON 表达式 WORKFLOW,内部委托给scheduleWorkflowCore

    scheduleFrequentJob

        public void scheduleFrequentJob() {
            long start = System.currentTimeMillis();
            // 调度 FIX_RATE/FIX_DELAY 表达式 JOB
            try {
                final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
                if (CollectionUtils.isEmpty(allAppIds)) {
                    log.info("[FrequentJobSchedule] current server has no app's job to schedule.");
                    return;
                }
                scheduleFrequentJobCore(allAppIds);
            } catch (Exception e) {
                log.error("[FrequentJobSchedule] schedule frequent job failed.", e);
            }
            long cost = System.currentTimeMillis() - start;
            log.info("[FrequentJobSchedule] frequent job schedule use {} ms.", cost);
            if (cost > SCHEDULE_RATE) {
                log.warn("[FrequentJobSchedule] The database query is using too much time({}ms), please check if the database load is too high!", cost);
            }
        }
    

    scheduleFrequentJob主要是调度FIX_RATE/FIX_DELAY 表达式 JOB,内部委托给了scheduleFrequentJobCore

    scheduleFrequentJobCore

        private void scheduleFrequentJobCore(List<Long> appIds) {
    
            Lists.partition(appIds, MAX_APP_NUM).forEach(partAppIds -> {
                try {
                    // 查询所有的秒级任务(只包含ID)
                    List<Long> jobIds = jobInfoRepository.findByAppIdInAndStatusAndTimeExpressionTypeIn(partAppIds, SwitchableStatus.ENABLE.getV(), TimeExpressionType.FREQUENT_TYPES);
                    if (CollectionUtils.isEmpty(jobIds)) {
                        return;
                    }
                    // 查询日志记录表中是否存在相关的任务
                    List<Long> runningJobIdList = instanceInfoRepository.findByJobIdInAndStatusIn(jobIds, InstanceStatus.GENERALIZED_RUNNING_STATUS);
                    Set<Long> runningJobIdSet = Sets.newHashSet(runningJobIdList);
    
                    List<Long> notRunningJobIds = Lists.newLinkedList();
                    jobIds.forEach(jobId -> {
                        if (!runningJobIdSet.contains(jobId)) {
                            notRunningJobIds.add(jobId);
                        }
                    });
    
                    if (CollectionUtils.isEmpty(notRunningJobIds)) {
                        return;
                    }
    
                    notRunningJobIds.forEach(jobId -> {
                        Optional<JobInfoDO> jobInfoOpt = jobInfoRepository.findById(jobId);
                        jobInfoOpt.ifPresent(jobInfoDO -> {
                            LifeCycle lifeCycle = LifeCycle.parse(jobInfoDO.getLifecycle());
                            // 生命周期已经结束
                            if (lifeCycle.getEnd() != null && lifeCycle.getEnd() < System.currentTimeMillis()) {
                                jobInfoDO.setStatus(SwitchableStatus.DISABLE.getV());
                                jobInfoDO.setGmtModified(new Date());
                                jobInfoRepository.saveAndFlush(jobInfoDO);
                                log.info("[FrequentScheduler] disable frequent job,id:{}.", jobInfoDO.getId());
                            } else if (lifeCycle.getStart() == null || lifeCycle.getStart() < System.currentTimeMillis() + SCHEDULE_RATE * 2) {
                                log.info("[FrequentScheduler] schedule frequent job,id:{}.", jobInfoDO.getId());
                                jobService.runJob(jobInfoDO.getAppId(), jobId, null, Optional.ofNullable(lifeCycle.getStart()).orElse(0L) - System.currentTimeMillis());
                            }
                        });
                    });
                } catch (Exception e) {
                    log.error("[FrequentScheduler] schedule frequent job failed.", e);
                }
            });
        }
    

    scheduleFrequentJobCore主要是调度秒级任务,它先找出秒级任务的id,然后过滤掉正在运行的任务,剩下的未运行的任务挨个判断是否需要调度,需要则执行jobService.runJob

    cleanData

        public void cleanData() {
            try {
                final List<Long> allAppIds = appInfoRepository.listAppIdByCurrentServer(transportService.defaultProtocol().getAddress());
                if (allAppIds.isEmpty()) {
                    return;
                }
                WorkerClusterManagerService.clean(allAppIds);
            } catch (Exception e) {
                log.error("[CleanData] clean data failed.", e);
            }
        }
    

    cleanData主要是通过WorkerClusterManagerService.clean来维护当前server负责的appId缓存

    InstanceStatusCheckService

    InstanceStatusCheckService提供了checkRunningInstance、checkWaitingDispatchInstance、checkWaitingWorkerReceiveInstance、checkWorkflowInstance方法

    小结

    PowerJob的CoreScheduleTaskManager在afterPropertiesSet的时候会启动一系列的线程,它们都是LoopRunnable类型的,其中scheduleNormalJob主要是调度CRON、DAILY_TIME_INTERVAL类型的任务,scheduleCronWorkflow主要是调度CRON 表达式 WORKFLOW任务,scheduleFrequentJob主要是调度FIX_RATE/FIX_DELAY 表达式 JOB。

    相关文章

      网友评论

          本文标题:聊聊PowerJob的任务调度

          本文链接:https://www.haomeiwen.com/subject/szikadtx.html