美文网首页
xxl-job源码2-调度端

xxl-job源码2-调度端

作者: modou1618 | 来源:发表于2019-03-21 22:31 被阅读0次

    一 服务端主体流程

    服务端流程
    • 任务触发时,需要进行执行器路由处理,并组装任务相关配置信息,如阻塞策略,分片参数,超时时间等。

    二 表

    2.1 XxlJobRegistry

    • 执行器注册信息
      类型,应用,执行器地址,心跳时间
    • 任务信息
    public class XxlJobInfo {
        
        private int id;             // 主键ID     (JobKey.name)
        
        private int jobGroup;       // 执行器主键ID  (JobKey.group)
        private String jobCron;     // 任务执行CRON表达式 【base on quartz】
        private String jobDesc;
        
        private Date addTime;
        private Date updateTime;
        
        private String author;      // 负责人
        private String alarmEmail;  // 报警邮件
    
        private String executorRouteStrategy;   // 执行器路由策略
        private String executorHandler;         // 执行器,任务Handler名称
        private String executorParam;           // 执行器,任务参数
        private String executorBlockStrategy;   // 阻塞处理策略
        private int executorTimeout;            // 任务执行超时时间,单位秒
        private int executorFailRetryCount;     // 失败重试次数
        
        private String glueType;        // GLUE类型   #com.xxl.job.core.glue.GlueTypeEnum
        private String glueSource;      // GLUE源代码
        private String glueRemark;      // GLUE备注
        private Date glueUpdatetime;    // GLUE更新时间
    
        private String childJobId;      // 子任务ID,多个逗号分隔
        
        // copy from quartz
        private String jobStatus;       // 任务状态 【base on quartz】
    }
    
    • 任务执行记录
    public class XxlJobLog {
        
        private int id;
        
        // job info
        private int jobGroup;//执行器主键id
        private int jobId;
    
        // execute info
        private String executorAddress;//执行器地址
        private String executorHandler;//执行器任务执行函数
        private String executorParam;//参数
        private String executorShardingParam;//分片参数
        private int executorFailRetryCount;//失败重试次数
        
        // trigger info
        private Date triggerTime;//触发时间
        private int triggerCode;//触发结果
        private String triggerMsg;
        
        // handle info
        private Date handleTime;//处理完成时间
        private int handleCode;//处理结果
        private String handleMsg;
    }
    
    • 应用执行器信息
    public class XxlJobGroup {
    
        private int id;
        private String appName;
        private String title;
        private int order;
        private int addressType;    // 执行器地址类型:0=自动注册、1=手动录入
        private String addressList;    // 执行器地址列表,多地址逗号分隔(手动录入)
    }
    

    三 任务触发

    • quartz调度触发执行RemoteHttpBean.executeInternal
    protected void executeInternal(JobExecutionContext context)
            throws JobExecutionException {
    
        // load jobId
        JobKey jobKey = context.getTrigger().getJobKey();
        Integer jobId = Integer.valueOf(jobKey.getName());
    
        // trigger
        JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
    }
    
    • JobTriggerPoolHelper使用线程池,每个任务触发一个线程执行
    public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
        helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
    }
    
    public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {
        triggerPool.execute(new Runnable() {
            @Override
            public void run() {
                XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
            }
        });
    }
    
    • 初始化任务调度的信息
    TriggerParam triggerParam = new TriggerParam();
    //任务id
    triggerParam.setJobId(jobInfo.getId());
    //任务处理函数,参数
    triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
    triggerParam.setExecutorParams(jobInfo.getExecutorParam());
    //阻塞策略
    triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
    //任务执行超时时间配置
    triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
    //任务触发记录id
    triggerParam.setLogId(jobLog.getId());
    triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
    //任务执行函数源码信息
    triggerParam.setGlueType(jobInfo.getGlueType());
    triggerParam.setGlueSource(jobInfo.getGlueSource());
    triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
    //分片信息
    triggerParam.setBroadcastIndex(index);
    triggerParam.setBroadcastTotal(total);
    
    • 简单任务按照执行器路由策略选择执行器
      executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());

    • xxlrpc发送任务触发消息

    public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
        ReturnT<String> runResult = null;
        try {
            ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);
            runResult = executorBiz.run(triggerParam);
        } catch (Exception e) {
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        }
    
        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());
    
        runResult.setMsg(runResultSB.toString());
        return runResult;
    }
    
    • 通知开始监控任务触发记录
      JobFailMonitorHelper.monitor(jobLog.getId());

    四 定时任务

    • JobRegistryMonitorHelper执行器心跳扫描,定时扫描执行器心跳时间,删除过期的执行器
    • JobFailMonitorHelper任务状态监控告警及失败重试
      blockqueu存储所有本调度器待监控的任务,定时进行检查任务。
      按照告警策略,进行失败重试或者发送告警。
      任务执行中,则继续监控

    相关文章

      网友评论

          本文标题:xxl-job源码2-调度端

          本文链接:https://www.haomeiwen.com/subject/hrjhvqtx.html