美文网首页源码阅读
XX-JOB阅读笔记(二):管理器调度执行器实现方式

XX-JOB阅读笔记(二):管理器调度执行器实现方式

作者: 黎明_dba5 | 来源:发表于2019-11-21 16:08 被阅读0次

    方法链路

    在阅读整个代码流程前,先罗列一下整个调度过程经过的主要类和方法:
    xxl-job-admin(调度器项目)->XxlJobScheduler.afterPropertiesSet()->JobScheduleHelper.getInstance().start()->JobTriggerPoolHelper.trigger()->JobTriggerPoolHelper.addTrigger()->XxlJobTrigger.trigger()->processTrigger()->runExecutor()。runExecutor方法中通过代理及netty请求,返回执行结果。

    具体源码

    直接从管理器Admin中的配置文类XxlJobScheduler.java开始,源码如下:

    @Override
        public void afterPropertiesSet() throws Exception {
            // init i18n
            initI18n();
            //执行器地址信息维护,比如删除失效的
            // admin registry monitor run
            JobRegistryMonitorHelper.getInstance().start();
            //查询执行失败,并且根据配置,进行告警
            // admin monitor run
            JobFailMonitorHelper.getInstance().start();
            //初始化管理器RPC工厂,并且指定管理器中/api实现类,给executor调用做准备
            // admin-server
            initRpcProvider();
            //启动admin中的调度器
            //start-schedule
            JobScheduleHelper.getInstance().start();
            logger.info(">>>>>>>>> init xxl-job admin success.");
        }
    

    JobScheduleHelper.getInstance().start() 就是管理器的调度入口,接着查看调度器中具体内容,

    // schedule thread
            scheduleThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        //针对多个节点,防止调度器出现并发调度一个任务,调度器执行频率控制,最长睡眠5秒,最短4秒多点,即(5000-999)毫秒
                        //如A节点的调度器刚启动,并且获取一个任务,然后加锁,如果同时B节点也启动,也获取到这个任务,防止重复调用,随机睡眠
                        TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                    } catch (InterruptedException e) {
                        if (!scheduleThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
    
                    while (!scheduleThreadToStop) {
    
                        // Scan Job
                        long start = System.currentTimeMillis();
    
                        Connection conn = null;
                        Boolean connAutoCommit = null;
                        PreparedStatement preparedStatement = null;
    
                        boolean preReadSuc = true;
                        try {
    
                            conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                            connAutoCommit = conn.getAutoCommit();
                            conn.setAutoCommit(false);
                            //锁定资源
                            preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                            preparedStatement.execute();
    
                            // tx start
    
                            // 1、pre read
                            long nowTime = System.currentTimeMillis();
                            //获取5秒内状态为正常运行的任务
                            List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS);
                            if (scheduleList!=null && scheduleList.size()>0) {
                                // 2、push time-ring
                                for (XxlJobInfo jobInfo: scheduleList) {
                                    //这里的触发时间处理,用一个时间横轴就比较好理解
                                    //当前时间已经超过触发时间+5秒,不调度,直接计算下次调度时间,对应时间段A
                                    // time-ring jump
                                    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                        // 2.1、trigger-expire > 5s:pass && make next-trigger-time
    
                                        // fresh next
                                        Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date());
                                        if (nextValidTime != null) {
                                            jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                            jobInfo.setTriggerNextTime(nextValidTime.getTime());
                                        } else {
                                            jobInfo.setTriggerStatus(0);
                                            jobInfo.setTriggerLastTime(0);
                                            jobInfo.setTriggerNextTime(0);
                                        }
                                    //当前时间已超过调度时间,但超过时间在5秒内,直接触发调度,并且更新下次调度时间,即触发时间在当前时间前5秒内
                                        //对应时间段B
                                    } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                        // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
    
                                        CronExpression cronExpression = new CronExpression(jobInfo.getJobCron());
                                        long nextTime = cronExpression.getNextValidTimeAfter(new Date()).getTime();
    
                                        // 1、trigger
                                        //将任务放到触发线程池中
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, shecule push trigger : jobId = " + jobInfo.getId() );
    
                                        // 2、fresh next
                                        jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                        jobInfo.setTriggerNextTime(nextTime);
    
                                        //如果下次触发时间在当前时间之后的5秒内,并且将这个时间段的任务单独放在ringThread线程中处理,即触发时间在当前时间的后5秒内
                                        //特别处理当前时间之后的5秒内,是因为本循环最长5秒循环一次,防止有漏掉的定时任务,对应时间段D
                                        //注:scheduleThread和ringThread两个线程的执行评率不一样
                                        // next-trigger-time in 5s, pre-read again
                                        if (jobInfo.getTriggerNextTime() - nowTime < PRE_READ_MS) {
    
                                            // 1、make ring second
                                            //此处计算出的ringSecond的值范围是0-59
                                            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                            //将job放入ringThread线程
                                            // 2、push time ring
                                            pushTimeRing(ringSecond, jobInfo.getId());
    
                                            // 3、fresh next
                                            Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()));
                                            if (nextValidTime != null) {
                                                jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                                jobInfo.setTriggerNextTime(nextValidTime.getTime());
                                            } else {
                                                jobInfo.setTriggerStatus(0);
                                                jobInfo.setTriggerLastTime(0);
                                                jobInfo.setTriggerNextTime(0);
                                            }
    
                                        }
    
                                    } else {//处理下次执行时间在当前之后的时间,对应时间段C
                                        // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
                                        //该处ringSecond计算的值为0-59秒,
                                        // 1、make ring second
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());
    
                                        // 3、fresh next
                                        Date nextValidTime = new CronExpression(jobInfo.getJobCron()).getNextValidTimeAfter(new Date(jobInfo.getTriggerNextTime()));
                                        if (nextValidTime != null) {
                                            jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
                                            jobInfo.setTriggerNextTime(nextValidTime.getTime());
                                        } else {
                                            jobInfo.setTriggerStatus(0);
                                            jobInfo.setTriggerLastTime(0);
                                            jobInfo.setTriggerNextTime(0);
                                        }
    
                                    }
    
                                }
    
                                // 3、update trigger info
                                for (XxlJobInfo jobInfo: scheduleList) {
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                                }
    
                            } else {
                                preReadSuc = false;
                            }
    
                            // tx stop
    
    
                        } catch (Exception e) {
                            if (!scheduleThreadToStop) {
                                logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);
                            }
                        } finally {
                            // commit
                            if (conn != null) {
                                try {
                                    conn.commit();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                                try {
                                    conn.setAutoCommit(connAutoCommit);
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                                try {
                                    conn.close();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                            }
    
                            // close PreparedStatement
                            if (null != preparedStatement) {
                                try {
                                    preparedStatement.close();
                                } catch (SQLException ignore) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(ignore.getMessage(), ignore);
                                    }
                                }
                            }
                        }
                        long cost = System.currentTimeMillis()-start;
                        // Wait seconds, align second
                        if (cost < 1000) {  // scan-overtime, not wait
                            try {
                                //preReadSuc 有5秒内正常运行的任务,则睡眠一秒以内,没有则睡眠5-(0至999)秒
                                // pre-read period: success > scan each second; fail > skip this period;
                                TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                            } catch (InterruptedException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
    
                    }
    
                    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
                }
            });
    

    时间轴如下:


    image.png

    接着看每秒扫描一次的调度线程

    //此线程是处理当前时间以后在每秒时是否有定时任务,有则直接启动,
            // ring thread
            ringThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    // align second
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
    
                    while (!ringThreadToStop) {
    
                        try {
                            // second data
                            List<Integer> ringItemData = new ArrayList<>();
                            // 避免处理耗时太长,跨过刻度,向前校验一个刻度;如果有的话,需要重新取回
                            int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
                            for (int i = 0; i < 2; i++) {
                                List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                                if (tmpData != null) {
                                    ringItemData.addAll(tmpData);
                                }
                            }
    
                            // ring trigger
                            logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                            if (ringItemData!=null && ringItemData.size()>0) {
                                // do trigger
                                for (int jobId: ringItemData) {
                                    // do trigger
                                    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
                                }
                                // clear
                                ringItemData.clear();
                            }
                        } catch (Exception e) {
                            if (!ringThreadToStop) {
                                logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                            }
                        }
    
                        // next second, align second
                        try {
                            //睡眠1秒以内
                            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                            if (!ringThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
                    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
                }
            });
    

    上面两个调度线程,触发任务时,都是通过trigger方法触发

    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null);
    

    接着看触发器JobTriggerPoolHelper.java,触发器中定义了一个快线程池和一个慢线程池,两个线程池只是线程池大小和任务缓存队列大小稍有不同,slowTriggerPool线程执行那种一分钟内,慢执行超过10次(执行时间超过500毫秒)的任务,其他任务则使用fastTriggerPool 线程池执行,具体线程池定义如下:

    // fast/slow thread pool
        private ThreadPoolExecutor fastTriggerPool = new ThreadPoolExecutor(
                50,
                200,
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                    }
                });
    
        private ThreadPoolExecutor slowTriggerPool = new ThreadPoolExecutor(
                10,
                100,
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                    }
                });
    

    具体看线程池中执行任务的策略

    /**
         * 线程池执行任务,及任务转变为慢线程策略
         * add trigger
         */
        public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) {
    
            // choose thread pool
            ThreadPoolExecutor triggerPool_ = fastTriggerPool;
            AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
            //一分钟内,慢执行超过10次,则使用slowTriggerPool执行任务
            if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
                triggerPool_ = slowTriggerPool;
            }
    
            // trigger
            triggerPool_.execute(new Runnable() {
                @Override
                public void run() {
                    long start = System.currentTimeMillis();
                    try {
                        //具体任务触发
                        // do trigger
                        XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam);
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    } finally {
                        // check timeout-count-map
                        long minTim_now = System.currentTimeMillis()/60000;
                        //每分钟清理一次jobTimeoutCountMap,这个简单实用达到一分钟内计数的目的
                        if (minTim != minTim_now) {
                            minTim = minTim_now;
                            jobTimeoutCountMap.clear();
                        }
    
                        //一分钟类执行时间超过500毫秒,则将任务用慢的线程池执行
                        // incr timeout-count-map
                        long cost = System.currentTimeMillis()-start;
                        if (cost > 500) {
                            //执行时间超过500毫秒,则jobTimeoutCountMap中当前任务慢情况加1
                            // ob-timeout threshold 500ms
                            AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
                            if (timeoutCount != null) {
                                timeoutCount.incrementAndGet();
                            }
                        }
    
                    }
    
                }
            });
        }
    

    接着看触发器XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam)中的实现

    public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam) {
            // load data
            XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
            if (jobInfo == null) {
                logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
                return;
            }
            if (executorParam != null) {
                jobInfo.setExecutorParam(executorParam);
            }
            int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
            XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
    
            // sharding param
            int[] shardingParam = null;
            if (executorShardingParam!=null){
                String[] shardingArr = executorShardingParam.split("/");
                if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
                    shardingParam = new int[2];
                    shardingParam[0] = Integer.valueOf(shardingArr[0]);
                    shardingParam[1] = Integer.valueOf(shardingArr[1]);
                }
            }
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
                    && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
                    && shardingParam==null) {
                for (int i = 0; i < group.getRegistryList().size(); i++) {
                    //如果是路由是分片 广播,则将注册地址中的都触发一遍
                    processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
                }
            } else {
                if (shardingParam == null) {
                    shardingParam = new int[]{0, 1};
                }
                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
            }
    
        }
    

    接着processTrigger方法

    private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
    
            // param
            //阻塞策略(eg:单机串行,丢弃后续调度,覆盖之前调度)
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
            //路由策略
            ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
            String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
    
            // 1、save log-id
            XxlJobLog jobLog = new XxlJobLog();
            jobLog.setJobGroup(jobInfo.getJobGroup());
            jobLog.setJobId(jobInfo.getId());
            jobLog.setTriggerTime(new Date());
            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
            logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
    
            // 2、init trigger-param
            TriggerParam triggerParam = new TriggerParam();
            triggerParam.setJobId(jobInfo.getId());
            triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
            triggerParam.setExecutorParams(jobInfo.getExecutorParam());
            triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
            triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
            triggerParam.setLogId(jobLog.getId());
            triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
            triggerParam.setGlueType(jobInfo.getGlueType());
            triggerParam.setGlueSource(jobInfo.getGlueSource());
            triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
            triggerParam.setBroadcastIndex(index);
            triggerParam.setBroadcastTotal(total);
    
            // 3、init address
            String address = null;
            ReturnT<String> routeAddressResult = null;
            if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
                if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                    if (index < group.getRegistryList().size()) {
                        address = group.getRegistryList().get(index);
                    } else {
                        address = group.getRegistryList().get(0);
                    }
                } else {
                    //使用接口策略模式,获取到执行器地址
                    routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                    if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                        address = routeAddressResult.getContent();
                    }
                }
            } else {
                routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
            }
    
            // 4、trigger remote executor
            ReturnT<String> triggerResult = null;
            if (address != null) {
                //执行job
                triggerResult = runExecutor(triggerParam, address);
            } else {
                triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
            }
            //搜集日志信息,保存结果
            // 5、collection trigger info
            StringBuffer triggerMsgSb = new StringBuffer();
            triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
                    .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
            if (shardingParam != null) {
                triggerMsgSb.append("("+shardingParam+")");
            }
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
            triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);
    
            triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
                    .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");
    
            // 6、save log trigger-info
            jobLog.setExecutorAddress(address);
            jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
            jobLog.setExecutorParam(jobInfo.getExecutorParam());
            jobLog.setExecutorShardingParam(shardingParam);
            jobLog.setExecutorFailRetryCount(finalFailRetryCount);
            //jobLog.setTriggerTime();
            jobLog.setTriggerCode(triggerResult.getCode());
            jobLog.setTriggerMsg(triggerMsgSb.toString());
            XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);
    
            logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
        }
    

    接着看runExecutor(triggerParam, address)方法

    /**
         * run executor
         * @param triggerParam
         * @param address
         * @return
         */
        public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
            ReturnT<String> runResult = null;
            try {
                //通过反射中的getObject和netty 调用到执行器的service中,
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
    //通过netty直接调度到执行器的executorBiz.run()方法,并返回结果
                runResult = executorBiz.run(triggerParam);
            } catch (Exception e) {
                logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
                runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
            }
    
            StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
            runResultSB.append("<br>address:").append(address);
            runResultSB.append("<br>code:").append(runResult.getCode());
            runResultSB.append("<br>msg:").append(runResult.getMsg());
    
            runResult.setMsg(runResultSB.toString());
            return runResult;
        }
    
    public static ExecutorBiz getExecutorBiz(String address) throws Exception {
            // valid
            if (address==null || address.trim().length()==0) {
                return null;
            }
    
            // load-cache
            address = address.trim();
            ExecutorBiz executorBiz = executorBizRepository.get(address);
            if (executorBiz != null) {
                return executorBiz;
            }
    
            // set-cache
            executorBiz = (ExecutorBiz) new XxlRpcReferenceBean(
                    NetEnum.NETTY_HTTP,
                    Serializer.SerializeEnum.HESSIAN.getSerializer(),
                    CallType.SYNC,
                    LoadBalance.ROUND,
                    ExecutorBiz.class,
                    null,
                    3000,
                    address,
                    XxlJobAdminConfig.getAdminConfig().getAccessToken(),
                    null,
                    null).getObject();
    
            executorBizRepository.put(address, executorBiz);
            return executorBiz;
        }
    

    在来看示列执行器xxl-job-executor-sample-springboot项目中的启动配置类中的一部分配置,XxlJobConfig->XxlJobSpringExecutor.start()->XxlJobExecutor->initRpcProvider().
    XxlJobExecutor中的start()方法如下:

    public void start() throws Exception {
            //设置日志路径
            // init logpath
            XxlJobFileAppender.initLogPath(logPath);
            //设置admin地址及执行器访问口令
            // init invoker, admin-client
            initAdminBizList(adminAddresses, accessToken);
            //设置日志清理线程参数
            // init JobLogFileCleanThread
            JobLogFileCleanThread.getInstance().start(logRetentionDays);
            //任务执行结果回调线程(包含回调失败后重试机制)
            // init TriggerCallbackThread
            TriggerCallbackThread.getInstance().start();
            // init executor-server
            //设置执行器ip和port
            port = port>0?port: NetUtil.findAvailablePort(9999);
            ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
            //注册执行器及初始化执行器上面的netty服务器信息
            initRpcProvider(ip, port, appName, accessToken);
        }
    

    进入initRpcProvider方法查看

    private void initRpcProvider(String ip, int port, String appName, String accessToken) throws Exception {
    
            // init, provider factory
            String address = IpUtil.getIpPort(ip, port);
            Map<String, String> serviceRegistryParam = new HashMap<String, String>();
            serviceRegistryParam.put("appName", appName);
            serviceRegistryParam.put("address", address);
    
            xxlRpcProviderFactory = new XxlRpcProviderFactory();
            //指定执行器注册类为ExecutorServiceRegistry
            xxlRpcProviderFactory.initConfig(NetEnum.NETTY_HTTP, Serializer.SerializeEnum.HESSIAN.getSerializer(), ip, port, accessToken, ExecutorServiceRegistry.class, serviceRegistryParam);
    
            //注册执行器上面的service,用来执行任务的入口
            // add services
            xxlRpcProviderFactory.addService(ExecutorBiz.class.getName(), null, new ExecutorBizImpl());
    
            //启动执行器注册工厂
            // start
            xxlRpcProviderFactory.start();
    
        }
    

    initRpcProvider方法就只做了两件事情,第一是通过ExecutorServiceRegistry.java将执行器注册到调度器(这个属于将执行器注册到调度器内容,在另外一篇文章中有介绍(https://www.jianshu.com/p/247c6cf53dca)),第二件事情是初始化一个netty客户端,并且将ExecutorBiz的service实例注册到netty服务器中,供调度器调度任务时使用。

    总结

    本框架有多容易上手和牛掰,就不在此夸赞了,整条业务逻辑拜读完之后,收获蛮多,简单列举一下本次主要的收获;
    1、对节点启动时防止并发做的微调睡眠控制;
    2、废弃quartz框架,用6张表就实现定时器功能,
    3、触发器采用两个线程,通过休眠频率不同,实现秒级触发
    4、对慢任务的定义及转移执行
    5、通过接口策略模式,获取执行器的路由地址
    6、netty中自研RPC框架部分

    相关文章

      网友评论

        本文标题:XX-JOB阅读笔记(二):管理器调度执行器实现方式

        本文链接:https://www.haomeiwen.com/subject/itizictx.html