美文网首页分布式定时任务框架
7. xxl-job 原理-- 调度中心任务管理

7. xxl-job 原理-- 调度中心任务管理

作者: 光小月 | 来源:发表于2019-05-30 15:04 被阅读85次

    xxl-job: v2.0.2 原理 目录学习

     在任务调度中心可以进行新建任务,新建任务之后可以在任务列表中查看相关任务,任务可以根据我们配置的cron表达式进行任务调度,或者也可以在任务列表中执行、暂停、删除和查看相关运行日志等操作。
    

    1. 任务新建

    在任务管理界面,新增任务


    1

    2. 任务操作

    2

    3. 任务操作的API

    • 任务管理界面
      JobInfoController的 index方法
    // @RequestMapping("/jobinfo") 默认接口
    @RequestMapping
        public String index(HttpServletRequest request, Model model, @RequestParam(required = false, defaultValue = "-1") int jobGroup) {
    
            // 枚举-字典
            model.addAttribute("ExecutorRouteStrategyEnum", ExecutorRouteStrategyEnum.values());    // 路由策略-列表
            model.addAttribute("GlueTypeEnum", GlueTypeEnum.values());                              // Glue类型-字典
            model.addAttribute("ExecutorBlockStrategyEnum", ExecutorBlockStrategyEnum.values());    // 阻塞处理策略-字典
    
            // 执行器列表
            List<XxlJobGroup> jobGroupList_all =  xxlJobGroupDao.findAll();
    
            // filter group
            List<XxlJobGroup> jobGroupList = filterJobGroupByRole(request, jobGroupList_all);
            if (jobGroupList==null || jobGroupList.size()==0) {
                throw new XxlJobException(I18nUtil.getString("jobgroup_empty"));
            }
    
            model.addAttribute("JobGroupList", jobGroupList);
            model.addAttribute("jobGroup", jobGroup);
    
            return "jobinfo/jobinfo.index";
        }
    
    • 任务新增
      JobInfoController.add
    @RequestMapping("/add")
    @ResponseBody
    public ReturnT<String> add(XxlJobInfo jobInfo) {
          return xxlJobService.add(jobInfo);
    }
    

    XxlJobServiceImpl
    在service 中,需要验证界面输入信息, valid, fix \r in shell , childJobId valid
    最后存储到xxl_job_info 表中

    XxlJobServiceImpl
    @Override
        public ReturnT<String> add(XxlJobInfo jobInfo) {
            // valid  验证基本的信息
            ......
            // fix "\r" in shell
            if (GlueTypeEnum.GLUE_SHELL==GlueTypeEnum.match(jobInfo.getGlueType()) && jobInfo.getGlueSource()!=null) {
                jobInfo.setGlueSource(jobInfo.getGlueSource().replaceAll("\r", ""));
            } 
            // ChildJobId valid
            if (jobInfo.getChildJobId()!=null && jobInfo.getChildJobId().trim().length()>0) {
                String[] childJobIds = jobInfo.getChildJobId().split(",");
                for (String childJobIdItem: childJobIds) {
                    if (childJobIdItem!=null && childJobIdItem.trim().length()>0 && isNumeric(childJobIdItem)) {
                        XxlJobInfo childJobInfo = xxlJobInfoDao.loadById(Integer.valueOf(childJobIdItem));
                        if (childJobInfo==null) {
                            return new ReturnT<String>(ReturnT.FAIL_CODE,
                                    MessageFormat.format((I18nUtil.getString("jobinfo_field_childJobId")+"({0})"+I18nUtil.getString("system_not_found")), childJobIdItem));
                        }
                    } ......
                }
        ......
                jobInfo.setChildJobId(temp);
            }
            // add in db
            xxlJobInfoDao.save(jobInfo);
            if (jobInfo.getId() < 1) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_add")+I18nUtil.getString("system_fail")) );
            }
            return new ReturnT<String>(String.valueOf(jobInfo.getId()));
        }
    
    • 任务更新
      JobInfoController.update
    @RequestMapping("/update")
        @ResponseBody
        public ReturnT<String> update(XxlJobInfo jobInfo) {
            return xxlJobService.update(jobInfo);
        }
    

    XxlJobServiceImpl
    service 中, valid , ChildJobId valid , group valid, stage job info, next trigger time (10s后生效,避开预读周期),

    @Override
        public ReturnT<String> update(XxlJobInfo jobInfo) {
            // valid 
    ......
    // ChildJobId valid
            ......
            // group valid
            XxlJobGroup jobGroup = xxlJobGroupDao.load(jobInfo.getJobGroup());
            if (jobGroup == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_jobgroup")+I18nUtil.getString("system_unvalid")) );
            }
            // stage job info
            XxlJobInfo exists_jobInfo = xxlJobInfoDao.loadById(jobInfo.getId());
            if (exists_jobInfo == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_id")+I18nUtil.getString("system_not_found")) );
            }
            // next trigger time (10s后生效,避开预读周期)
            long nextTriggerTime = exists_jobInfo.getTriggerNextTime();
            if (exists_jobInfo.getTriggerStatus() == 1 && !jobInfo.getJobCron().equals(exists_jobInfo.getJobCron()) ) {
                try {
                    nextTriggerTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(System.currentTimeMillis() + 10000)).getTime();
                } catch (ParseException e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobinfo_field_cron_unvalid")+" | "+ e.getMessage());
                }
            }
            exists_jobInfo.setJobGroup(jobInfo.getJobGroup());
            exists_jobInfo.setJobCron(jobInfo.getJobCron());
            exists_jobInfo.setJobDesc(jobInfo.getJobDesc());
            exists_jobInfo.setAuthor(jobInfo.getAuthor());
            exists_jobInfo.setAlarmEmail(jobInfo.getAlarmEmail());
            exists_jobInfo.setExecutorRouteStrategy(jobInfo.getExecutorRouteStrategy());
            exists_jobInfo.setExecutorHandler(jobInfo.getExecutorHandler());
            exists_jobInfo.setExecutorParam(jobInfo.getExecutorParam());
            exists_jobInfo.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
            exists_jobInfo.setExecutorTimeout(jobInfo.getExecutorTimeout());
            exists_jobInfo.setExecutorFailRetryCount(jobInfo.getExecutorFailRetryCount());
            exists_jobInfo.setChildJobId(jobInfo.getChildJobId());
            exists_jobInfo.setTriggerNextTime(nextTriggerTime);
            xxlJobInfoDao.update(exists_jobInfo);
            return ReturnT.SUCCESS;
        }
    
    • 任务删除
      JobInfoController.remove
    @RequestMapping("/remove")
        @ResponseBody
        public ReturnT<String> remove(int id) {
            return xxlJobService.remove(id);
        }
    

    XxlJobServiceImpl
    service 中, 删除 xxl_job_info, xxl_job_log, xxl_job_log_glue 对应的信息

    @Override
        public ReturnT<String> remove(int id) {
            XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
            if (xxlJobInfo == null) {
                return ReturnT.SUCCESS;
            }
    
            xxlJobInfoDao.delete(id);
            xxlJobLogDao.delete(id);
            xxlJobLogGlueDao.deleteByJobId(id);
            return ReturnT.SUCCESS;
        }
    
    • 任务停止 其运行
      JobInfoController.pause
    @RequestMapping("/stop")
        @ResponseBody
        public ReturnT<String> pause(int id) {
            return xxlJobService.stop(id);
        }
    

    XxlJobServiceImpl
    service 中, 设置xxl_job_info的触犯时间为 0 , 更新

    @Override
        public ReturnT<String> stop(int id) {
            XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
    
            xxlJobInfo.setTriggerStatus(0);
            xxlJobInfo.setTriggerLastTime(0);
            xxlJobInfo.setTriggerNextTime(0);
    
            xxlJobInfoDao.update(xxlJobInfo);
            return ReturnT.SUCCESS;
        }
    
    • 任务 运行 ,进行周期性执行
      JobInfoController.start
    @RequestMapping("/start")
        @ResponseBody
        public ReturnT<String> start(int id) {
            return xxlJobService.start(id);
        }
    

    XxlJobServiceImpl
    service 中, 设置xxl_job_info的触犯时间为 0 , 更新

    @Override
        public ReturnT<String> start(int id) {
            XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
    
            // next trigger time (10s后生效,避开预读周期)
            long nextTriggerTime = 0;
            try {
                nextTriggerTime = new CronExpression(xxlJobInfo.getJobCron()).getNextValidTimeAfter(new Date(System.currentTimeMillis() + 10000)).getTime();
            } catch (ParseException e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobinfo_field_cron_unvalid")+" | "+ e.getMessage());
            }
    
            xxlJobInfo.setTriggerStatus(1);
            xxlJobInfo.setTriggerLastTime(0);
            xxlJobInfo.setTriggerNextTime(nextTriggerTime);
    
            xxlJobInfoDao.update(xxlJobInfo);
            return ReturnT.SUCCESS;
        }
    
    
    • 任务触发, 执行一次
      JobInfoController.triggerJob
    @RequestMapping("/trigger")
        @ResponseBody
        //@PermissionLimit(limit = false)
        public ReturnT<String> triggerJob(int id, String executorParam) {
            // force cover job param
            if (executorParam == null) {
                executorParam = "";
            }
    //  利用jobtriggerPoolHelper类进行触发
            JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam);
            return ReturnT.SUCCESS;
        }
    

    PS: 若你觉得可以、还行、过得去、甚至不太差的话,可以“关注”一下,就此谢过!

    相关文章

      网友评论

        本文标题:7. xxl-job 原理-- 调度中心任务管理

        本文链接:https://www.haomeiwen.com/subject/ocdytctx.html