美文网首页
XXL-JOB分布式任务调度平台

XXL-JOB分布式任务调度平台

作者: Ravitn | 来源:发表于2020-11-29 22:42 被阅读0次

    引言

    当我们提起定时任务时,我们会想起,JDK的[Timer][]和定时调度框架[Quartz][], Quartz一般与Spring家族框架结合使用。Quartz的任务存储有两种形式,一种是存储的内存,但是当应用重启是,定时任务将会丢失,另一种方式为数据库存储,数据库存储任务,解决了应用重启引起的任务丢失问题。无论是Quartz单独使用,还是与Spring的集成,应用一旦启动,我们将无法修改定时任务,同时Quartz对分布式的定时任务调度存在局限性。淘宝分布式定时任务调度框架[TBschedule][]的出现解决了分布式调度的问题,然而TBschedule染了阿里开源的通病,开源后,无维护,无更新,文档粗糙。今天我们来看一个轻量级的分布式任务调度框架XXL_JOB

    原文地址:https://donaldhan.github.io/xxl_job/2020/09/07/XXL-Job-framework-design.html

    目录

    usage

    1. 先添加执行器,会自动注册机器列表,如果配置了域名则直接找域名否则,直接注册应机器的IP+PORT,执行器支持集群部署,集群部署,要保证执行器的名称一致(app_name);

    2. 添加任务(隶属于执行器)。

    xxl-job架构设计

    xxl-job-framwork.png

    从上面可以看出,job管理中心,主要包括一下组件:

    • 执行器注册监控(JobRegistryMonitorHelper):注册监控服务,将所有应用的存活服务器,写到响应的任务分组下
    • 任务失败重试监控(JobFailMonitorHelper):失败重试,如果需要则告警
    • 丢失任务监控器(JobLosedMonitorHelper)

    执行器客户端主要包括一下组件:

    • 日志清理
    • 任务回调通知
    • 嵌入式HTTPserver:主要提供了心跳,空闲心跳,执行job,killjob,等REST操作。

    所有的通信都是http。

    来看一下控制台的相关模型

    控制台模型

    #
    # XXL-JOB v2.2.0
    # Copyright (c) 2015-present, xuxueli.
    
    CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci;
    use `xxl_job`;
    
    SET NAMES utf8mb4;
    -- job
    CREATE TABLE `xxl_job_info` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `job_group` int(11) NOT NULL COMMENT '执行器主键ID',
      `job_cron` varchar(128) NOT NULL COMMENT '任务执行CRON',
      `job_desc` varchar(255) NOT NULL,
      `add_time` datetime DEFAULT NULL,
      `update_time` datetime DEFAULT NULL,
      `author` varchar(64) DEFAULT NULL COMMENT '作者',
      `alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',
      `executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
      `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
      `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
      `executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',
      `executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
      `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
      `glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',
      `glue_source` mediumtext COMMENT 'GLUE源代码',
      `glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',
      `glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',
      `child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',
      `trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行',
      `trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',
      `trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    -- job执行日志
    CREATE TABLE `xxl_job_log` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `job_group` int(11) NOT NULL COMMENT '执行器主键ID',
      `job_id` int(11) NOT NULL COMMENT '任务,主键ID',
      `executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址',
      `executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
      `executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
      `executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',
      `executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
      `trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
      `trigger_code` int(11) NOT NULL COMMENT '调度-结果',
      `trigger_msg` text COMMENT '调度-日志',
      `handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
      `handle_code` int(11) NOT NULL COMMENT '执行-状态',
      `handle_msg` text COMMENT '执行-日志',
      `alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败',
      PRIMARY KEY (`id`),
      KEY `I_trigger_time` (`trigger_time`),
      KEY `I_handle_code` (`handle_code`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    -- job执行报告
    CREATE TABLE `xxl_job_log_report` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `trigger_day` datetime DEFAULT NULL COMMENT '调度-时间',
      `running_count` int(11) NOT NULL DEFAULT '0' COMMENT '运行中-日志数量',
      `suc_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量',
      `fail_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量',
      PRIMARY KEY (`id`),
      UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    
    CREATE TABLE `xxl_job_logglue` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `job_id` int(11) NOT NULL COMMENT '任务,主键ID',
      `glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型',
      `glue_source` mediumtext COMMENT 'GLUE源代码',
      `glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注',
      `add_time` datetime DEFAULT NULL,
      `update_time` datetime DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    -- job执行器注册表,当客户端启动时,注册到相应的执行器分组下面,分组,应用名,执行服务器列表(ip:port, 域名)
    CREATE TABLE `xxl_job_registry` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `registry_group` varchar(50) NOT NULL,
      `registry_key` varchar(255) NOT NULL,
      `registry_value` varchar(255) NOT NULL,
      `update_time` datetime DEFAULT NULL,
      PRIMARY KEY (`id`),
      KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    -- job执行器
    CREATE TABLE `xxl_job_group` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `app_name` varchar(64) NOT NULL COMMENT '执行器AppName',
      `title` varchar(12) NOT NULL COMMENT '执行器名称',
      `address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行器地址类型:0=自动注册、1=手动录入',
      `address_list` varchar(512) DEFAULT NULL COMMENT '执行器地址列表,多地址逗号分隔',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    -- 用户表
    CREATE TABLE `xxl_job_user` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `username` varchar(50) NOT NULL COMMENT '账号',
      `password` varchar(50) NOT NULL COMMENT '密码',
      `role` tinyint(4) NOT NULL COMMENT '角色:0-普通用户、1-管理员',
      `permission` varchar(255) DEFAULT NULL COMMENT '权限:执行器ID列表,多个逗号分割',
      PRIMARY KEY (`id`),
      UNIQUE KEY `i_username` (`username`) USING BTREE
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    -- job 锁表
    CREATE TABLE `xxl_job_lock` (
      `lock_name` varchar(50) NOT NULL COMMENT '锁名称',
      PRIMARY KEY (`lock_name`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
    
    
    INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL);
    INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_cron`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '0 0 0 * * ? *', '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', '');
    INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL);
    INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');
    
    commit;
    
    
    

    从上面看出,主要模型,主要包括job分组,job注册器,job,job日志,job执行报告,及job调度锁等模型。

    我们来看具体任务的调度执行。

    任务调度(管理控制台)

    启动任务调度器(JobScheduleHelper)
    //JobScheduleHelper

     /**
         *
         */
        public void start(){
    
            // schedule thread
            scheduleThread = new Thread(new Runnable() {
            ...
            });
            scheduleThread.setDaemon(true);
            scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
            scheduleThread.start();
    
            // ring thread, 轮询在5s之内需要触发的任务
            ringThread = new Thread(new Runnable() {
               
            });
            ringThread.setDaemon(true);
            ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
            ringThread.start();
        }
    

    job调度器内部主要是有两个线程,一个是调度线程scheduleThread, 用于从数据库中筛选出未来5秒内需要调度的job,如果触发的话,通知job执行器,执行任务;
    如果还有到调度时间的放到ringData Map中,待ringThread线程,每秒从ringData Map中查出对应的任务,并通知job执行器,执行任。

      * 存储当前轮询批次内,需要调度的任务,key为时间秒,value为list<TaskId>
         */
        private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
    

    //JobScheduleHelper

     /**
         *
         */
        public void start(){
    
            // schedule thread
            scheduleThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        //调度线程睡眠5s
                        TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                    } catch (InterruptedException e) {
                        
                        ...
                    }
                    // 预读线程池数
                    // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                    int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
    
                    while (!scheduleThreadToStop) {
    
                        // Scan Job
                        long start = System.currentTimeMillis();
    
                        Connection conn = null;
                        Boolean connAutoCommit = null;
                        PreparedStatement preparedStatement = null;
    
                        boolean preReadSuc = true;
                        try {
    
                            conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                            connAutoCommit = conn.getAutoCommit();
                            conn.setAutoCommit(false);
                            //获取调度锁
                            preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                            preparedStatement.execute();
    
                            // tx start
    
                            // 1、pre read
                            long nowTime = System.currentTimeMillis();
                            //查询下次调度时间在跟定调度时间范围内的调度任务,提前5s
                            List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                            if (scheduleList!=null && scheduleList.size()>0) {
                                // 2、push time-ring
                                for (XxlJobInfo jobInfo: scheduleList) {
    
                                    // time-ring jump
                                    if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                        // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                        logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
    
                                        // fresh next, job错过触发事件,刷新job下次触发执行时间
                                        refreshNextValidTime(jobInfo, new Date());
    
                                    } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                        // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
    
                                        // 1、trigger, 触发job
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                        logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
    
                                        // 2、fresh next  刷新job下次触发执行时间
                                        refreshNextValidTime(jobInfo, new Date());
    
                                        // next-trigger-time in 5s, pre-read again, 下次触发事件在5s之内需要触发
                                        if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
    
                                            // 1、make ring second , 获取下次触发需要等的秒数
                                            int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                            // 2、push time ring 放到
                                            pushTimeRing(ringSecond, jobInfo.getId());
    
                                            // 3、fresh next
                                            refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    
                                        }
    
                                    } else {
                                        // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
    
                                        // 1、make ring second
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
    
                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());
    
                                        // 3、fresh next
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
    
                                    }
    
                                }
    
                                // 3、update trigger info 更新任务触发信息
                                for (XxlJobInfo jobInfo: scheduleList) {
    
                                    XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                                }
    
                            } else {
                                preReadSuc = false;
                            }
    
                            // tx stop
    
    
                        } catch (Exception e) {
                          ...
                        } finally {
    
                            // commit
                            if (conn != null) {
                                try {
                                    conn.commit();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                                try {
                                    conn.setAutoCommit(connAutoCommit);
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                                try {
                                    conn.close();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                            }
    
                            // close PreparedStatement
                            if (null != preparedStatement) {
                                try {
                                    preparedStatement.close();
                                } catch (SQLException e) {
                                    if (!scheduleThreadToStop) {
                                        logger.error(e.getMessage(), e);
                                    }
                                }
                            }
                        }
                        long cost = System.currentTimeMillis()-start;
    
    
                        // Wait seconds, align second
                        if (cost < 1000) {  // scan-overtime, not wait
                            try {
                                // pre-read period: success > scan each second; fail > skip this period;
                                TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                            } catch (InterruptedException e) {
                                if (!scheduleThreadToStop) {
                                    logger.error(e.getMessage(), e);
                                }
                            }
                        }
    
                    }
    
                    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
                }
            });
            scheduleThread.setDaemon(true);
            scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
            scheduleThread.start();
            ...
    

    从上面可以看出,调度线程scheduleThread, 首先获取调度锁,从数据库中筛选出未来5秒内需要调度的job,如果触发的话,通知job执行器,执行任务;

    再来看一下没有到触发时间的任务,处理策略

      /**
         * 下次需要轮询的任务集
         * @param ringSecond
         * @param jobId
         */
        private void pushTimeRing(int ringSecond, int jobId){
            // push async ring
            List<Integer> ringItemData = ringData.get(ringSecond);
            if (ringItemData == null) {
                ringItemData = new ArrayList<Integer>();
                ringData.put(ringSecond, ringItemData);
            }
            ringItemData.add(jobId);
    
            logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
        }
    

    先放到待调度的任务集合中ringData; 待ring线程调度。

    再来看ring线程

     /**
         *
         */
        public void start(){
    
            ...
            // ring thread, 轮询在5s之内需要触发的任务
            ringThread = new Thread(new Runnable() {
                @Override
                public void run() {
    
                    // align second
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
    
                    while (!ringThreadToStop) {
    
                        try {
                            // second data
                            List<Integer> ringItemData = new ArrayList<>();
                            int nowSecond = Calendar.getInstance().get(Calendar.SECOND);   // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                            for (int i = 0; i < 2; i++) {
                                //取出当前时刻需要调度的任务
                                List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                                if (tmpData != null) {
                                    ringItemData.addAll(tmpData);
                                }
                            }
    
                            // ring trigger
                            logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
                            if (ringItemData.size() > 0) {
                                // do trigger
                                for (int jobId: ringItemData) {
                                    // do trigger, 触发任务
                                    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                                }
                                // clear
                                ringItemData.clear();
                            }
                        } catch (Exception e) {
                            if (!ringThreadToStop) {
                                logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                            }
                        }
    
                        // next second, align second
                        try {
                            //睡眠1秒钟
                            TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                            if (!ringThreadToStop) {
                                logger.error(e.getMessage(), e);
                            }
                        }
                    }
                    logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
                }
            });
            ringThread.setDaemon(true);
            ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
            ringThread.start();
        }
    
    

    从上面可以看出,ring线程每秒从ringData Map中查出对应的任务,并通知job执行器,执行任。

    我们来看一下关键的步骤, 通知job执行器,执行任务。

    // do trigger, 触发任务
    JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
    

    //JobTriggerPoolHelper

    /**
         * 触发job
         * @param jobId
         * @param triggerType
         * @param failRetryCount
         *          >=0: use this param
         *          <0: use param from job info config
         * @param executorShardingParam 分片参数 "index/total"
         * @param executorParam
         *          null: use job param
         *          not null: cover job param
         */
        public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
            helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
        }
          /**
         * add trigger
         * 添加job触发器
         */
        public void addTrigger(final int jobId,
                               final TriggerTypeEnum triggerType,
                               final int failRetryCount,
                               final String executorShardingParam,
                               final String executorParam,
                               final String addressList) {
    
            // choose thread pool
            ThreadPoolExecutor triggerPool_ = fastTriggerPool;
            AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
            if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
                triggerPool_ = slowTriggerPool;
            }
    
            // trigger
            triggerPool_.execute(new Runnable() {
                @Override
                public void run() {
    
                    long start = System.currentTimeMillis();
    
                    try {
                        // do trigger 触发远程执行服务器,执行job
                        XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
                    } 
                }
            });
        }
    

    //XxlJobTrigger

     /* trigger job
         *
         * @param jobId
         * @param triggerType
         * @param failRetryCount
         *          >=0: use this param
         *          <0: use param from job info config
         * @param executorShardingParam
         * @param executorParam
         *          null: use job param
         *          not null: cover job param
         * @param addressList
         *          null: use executor addressList
         *          not null: cover
         */
        public static void trigger(int jobId,
                                   TriggerTypeEnum triggerType,
                                   int failRetryCount,
                                   String executorShardingParam,
                                   String executorParam,
                                   String addressList) {
    
            // load data 加载任务数据
            XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
            if (jobInfo == null) {
                logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
                return;
            }
            if (executorParam != null) {
                jobInfo.setExecutorParam(executorParam);
            }
            ...
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
                    && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
                    && shardingParam==null) {
                //分片广播模式,且分片参数为空
                for (int i = 0; i < group.getRegistryList().size(); i++) {
                    processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
                }
            } else {
                if (shardingParam == null) {
                    shardingParam = new int[]{0, 1};
                }
                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
            }
    
        }
    
          /**
         * 处理任务触发器
         * @param group                     job group, registry list may be empty
         * @param jobInfo
         * @param finalFailRetryCount
         * @param triggerType
         * @param index                     sharding index
         * @param total                     sharding index
         */
        private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
            ...
            // 4、trigger remote executor 触发远程执行器
            ReturnT<String> triggerResult = null;
            if (address != null) {
                triggerResult = runExecutor(triggerParam, address);
            } else {
                triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
            }
            ...
        }
    
         /**
         * run executor
         * @param triggerParam
         * @param address
         * @return
         */
        public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
            ReturnT<String> runResult = null;
            try {
                //获取客户端执行器
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                //执行执行器
                runResult = executorBiz.run(triggerParam);
            }
        }
    

    //XxlJobScheduler

    /**
         * 获取给定地址的执行器,存在则直接从缓存加载,否则放到执行器客户端池
         * @param address
         * @return
         * @throws Exception
         */
        public static ExecutorBiz getExecutorBiz(String address) throws Exception {
            // valid
            if (address==null || address.trim().length()==0) {
                return null;
            }
            // load-cache
            address = address.trim();
            ExecutorBiz executorBiz = executorBizRepository.get(address);
            if (executorBiz != null) {
                return executorBiz;
            }
            // set-cache
            executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
            executorBizRepository.put(address, executorBiz);
            return executorBiz;
        }
    
    

    //ExecutorBizClient

     @Override
        public ReturnT<String> run(TriggerParam triggerParam) {
            return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
        }
    

    从上面可以看出, 任务执行,实际上,根据路由策略,从任务分组中选择,执行服务器,通知任务服务器执行任务。针对分片任务,则通知所有的任务服务器。

    再看看一下任务执行器

    执行器

    在客户端的时候,一般我们会启用如下配置

    @Configuration
    public class XxlJobConfig {
    
        @Value("${xxl.job.admin.addresses}")
        private String adminAddresses;
    
        @Value("${xxl.job.accessToken}")
        private String accessToken;
    
        @Value("${xxl.job.executor.appname}")
        private String appname;
    
        @Value("${xxl.job.executor.address}")
        private String address;
    
        @Value("${xxl.job.executor.ip}")
        private String ip;
    
        @Value("${xxl.job.executor.port}")
        private int port;
    
        @Value("${xxl.job.executor.logpath}")
        private String logPath;
    
        @Value("${xxl.job.executor.logretentiondays}")
        private int logRetentionDays;
    
    
        @Bean
        public XxlJobSpringExecutor xxlJobExecutor() {
            logger.info(">>>>>>>>>>> xxl-job config init.");
            XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
            xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
            xxlJobSpringExecutor.setAppname(appname);
            xxlJobSpringExecutor.setAddress(address);
            xxlJobSpringExecutor.setIp(ip);
            xxlJobSpringExecutor.setPort(port);
            xxlJobSpringExecutor.setAccessToken(accessToken);
            xxlJobSpringExecutor.setLogPath(logPath);
            xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
    
            return xxlJobSpringExecutor;
        }
        ...
    }
    

    来看XxlJobSpringExecutor的初始化

    public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
        private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
    
    
        /**
         *
         */
        // start
        @Override
        public void afterSingletonsInstantiated() {
    
            // init JobHandler Repository 老版本使用
            /*initJobHandlerRepository(applicationContext);*/
    
            // init JobHandler Repository (for method) 初始化方法级job
            initJobHandlerMethodRepository(applicationContext);
    
            // refresh GlueFactory,刷新GLUE工厂类型
            GlueFactory.refreshInstance(1);
    
            // super start
            try {
                super.start();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        ...
    }
    
    

    XxlJobSpringExecutor初始化,主要是将JobHandler注解方法级的job放到,执行器中;

    我们来简单看一下
    //XxlJobSpringExecutor

     /**
         *  初始化方法级job
         * @param applicationContext
         */
        private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
            if (applicationContext == null) {
                return;
            }
            // init job handler from method
            String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
            for (String beanDefinitionName : beanDefinitionNames) {
                Object bean = applicationContext.getBean(beanDefinitionName);
                // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
                //扫描应用上下文中bean的XxlJob注解方法
                Map<Method, XxlJob> annotatedMethods = null;
                try {
                    annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                            new MethodIntrospector.MetadataLookup<XxlJob>() {
                                @Override
                                public XxlJob inspect(Method method) {
                                    return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                                }
                            });
                } catch (Throwable ex) {
                    logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
                }
                if (annotatedMethods==null || annotatedMethods.isEmpty()) {
                    continue;
                }
    
                for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
                    Method method = methodXxlJobEntry.getKey();
                    XxlJob xxlJob = methodXxlJobEntry.getValue();
                    if (xxlJob == null) {
                        continue;
                    }
                    //job名称
                    String name = xxlJob.value();
                    if (name.trim().length() == 0) {
                        throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                    }
                    //check是否存在同名的job
                    if (loadJobHandler(name) != null) {
                        throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
                    }
    
                    // execute method, 参数检查,只能有可一个参数,并且为Strng类型
                    if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
                        throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                                "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                    }
                    //检查返回值类型
                    if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
                        throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
                                "The correct method format like \" public ReturnT<String> execute(String param) \" .");
                    }
                    method.setAccessible(true);
    
                    // init and destory , 初始化job,init和销毁方法
                    Method initMethod = null;
                    Method destroyMethod = null;
                    if (xxlJob.init().trim().length() > 0) {
                        try {
                            initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
                            initMethod.setAccessible(true);
                        } catch (NoSuchMethodException e) {
                            throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                        }
                    }
                    if (xxlJob.destroy().trim().length() > 0) {
                        try {
                            destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
                            destroyMethod.setAccessible(true);
                        } catch (NoSuchMethodException e) {
                            throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
                        }
                    }
    
                    // registry jobhandler, 注册job处理器
                    registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
                }
            }
    
        }
    // ---------------------- job handler repository ----------------------
        private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
    
        /**
         * 注册job处理器
         * @param name
         * @param jobHandler
         * @return
         */
        public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
            logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
            return jobHandlerRepository.put(name, jobHandler);
        }
    
    

    从上面可以看出,执行器任务集实际为一个 ConcurrentMap<String, IJobHandler>;

    简单看一下方法级任务

    public class MethodJobHandler extends IJobHandler {
    
        /**
         * job类
         */
        private final Object target;
        /**
         * job方法
         */
        private final Method method;
        /**
         * job初始化方法
         */
        private Method initMethod;
        /**
         * job销毁方法
         */
        private Method destroyMethod;
        ...
    }
    

    从上面可以看出MethodJobHandler包装的任务对象,方法级job,及job的初始化与销毁。

    再来卡执行器实际的启动情况

    //XxlJobExecutor

    public class XxlJobExecutor  {
        /**
         * 启动job执行器
         * @throws Exception
         */
        public void start() throws Exception {
    
            // init logpath
            XxlJobFileAppender.initLogPath(logPath);
    
            // init invoker, admin-client 初始化job控制台客户端
            initAdminBizList(adminAddresses, accessToken);
    
    
            // init JobLogFileCleanThread 开启日志清理线程
            JobLogFileCleanThread.getInstance().start(logRetentionDays);
    
            // init TriggerCallbackThread 开启触发回调线程
            TriggerCallbackThread.getInstance().start();
    
            // init executor-server 启动执行器server
            initEmbedServer(address, ip, port, appname, accessToken);
        }
        ...
    }
    
    

    从上面可以看出,启动XxlJobExecutor,实际上是启动 初始化job控制台客户端,开启日志清理线程,开启触发回调线程(任务执行,将任务执行完,放到回调线程,回调线程轮询回调队列,并通知客户端)
    ,启动一个基于netty嵌入式的http server,用于接收控制台调度的调度通知。
    //XxlJobExecutor

     private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
    
            // fill ip port
            port = port>0?port: NetUtil.findAvailablePort(9999);
            ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
    
            // generate address
            if (address==null || address.trim().length()==0) {
                String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
                address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
            }
    
            // start
            embedServer = new EmbedServer();
            embedServer.start(address, port, appname, accessToken);
        }
    

    //EmbedServer

    /**
     * Copy from : https://github.com/xuxueli/xxl-rpc
     * 基于netty的嵌入式客户端
     * @author xuxueli 2020-04-11 21:25
     */
    public class EmbedServer {
        private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);
    
        private ExecutorBiz executorBiz;
        private Thread thread;
    
        /**
         * 启动执行器server
         * @param address
         * @param port
         * @param appname
         * @param accessToken
         */
        public void start(final String address, final int port, final String appname, final String accessToken) {
            executorBiz = new ExecutorBizImpl();
            thread = new Thread(new Runnable() {
    
                @Override
                public void run() {
    
                    // param
                    EventLoopGroup bossGroup = new NioEventLoopGroup();
                    EventLoopGroup workerGroup = new NioEventLoopGroup();
                    ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
                            0,
                            200,
                            60L,
                            TimeUnit.SECONDS,
                            new LinkedBlockingQueue<Runnable>(2000),
                            new ThreadFactory() {
                                @Override
                                public Thread newThread(Runnable r) {
                                    return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
                                }
                            },
                            new RejectedExecutionHandler() {
                                @Override
                                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                                    throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
                                }
                            });
    
    
                    try {
                        // start server 启动netty http server
                        ServerBootstrap bootstrap = new ServerBootstrap();
                        bootstrap.group(bossGroup, workerGroup)
                                .channel(NioServerSocketChannel.class)
                                .childHandler(new ChannelInitializer<SocketChannel>() {
                                    @Override
                                    public void initChannel(SocketChannel channel) throws Exception {
                                        channel.pipeline()
                                                .addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle
                                                .addLast(new HttpServerCodec())
                                                .addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL
                                                .addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
                                    }
                                })
                                .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                        // bind
                        ChannelFuture future = bootstrap.bind(port).sync();
    
                        logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
    
                        // start registry, 启动任务执行注册器线程
                        startRegistry(appname, address);
    
                        // wait util stop
                        future.channel().closeFuture().sync();
    
                    } catch (InterruptedException e) {
                        if (e instanceof InterruptedException) {
                            logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
                        } else {
                            logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
                        }
                    } finally {
                        // stop
                        try {
                            workerGroup.shutdownGracefully();
                            bossGroup.shutdownGracefully();
                        } catch (Exception e) {
                            logger.error(e.getMessage(), e);
                        }
                    }
    
                }
    
            });
            thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
            thread.start();
        }
    
    

    关键看EmbedHttpServerHandler

    //EmbedHttpServerHandler

    /**
         * netty_http
         *
         * Copy from : https://github.com/xuxueli/xxl-rpc
         *
         * @author xuxueli 2015-11-24 22:25:15
         */
        public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
            private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);
    
            private ExecutorBiz executorBiz;
            private String accessToken;
            private ThreadPoolExecutor bizThreadPool;
            public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
                this.executorBiz = executorBiz;
                this.accessToken = accessToken;
                this.bizThreadPool = bizThreadPool;
            }
    
            @Override
            protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
    
                // request parse
                //final byte[] requestBytes = ByteBufUtil.getBytes(msg.content());    // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
                String requestData = msg.content().toString(CharsetUtil.UTF_8);
                String uri = msg.uri();
                HttpMethod httpMethod = msg.method();
                boolean keepAlive = HttpUtil.isKeepAlive(msg);
                String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
    
                // invoke
                bizThreadPool.execute(new Runnable() {
                    @Override
                    public void run() {
                        // do invoke 处理http请求
                        Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
    
                        // to json
                        String responseJson = GsonTool.toJson(responseObj);
    
                        // write response
                        writeResponse(ctx, keepAlive, responseJson);
                    }
                });
            }
    
            /**
             * 处理admin http请求
             * @param httpMethod
             * @param uri
             * @param requestData
             * @param accessTokenReq
             * @return
             */
            private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
    
                // valid
                if (HttpMethod.POST != httpMethod) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
                }
                if (uri==null || uri.trim().length()==0) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
                }
                if (accessToken!=null
                        && accessToken.trim().length()>0
                        && !accessToken.equals(accessTokenReq)) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
                }
    
                // services mapping
                try {
                    if ("/beat".equals(uri)) {
                        //心跳
                        return executorBiz.beat();
                    } else if ("/idleBeat".equals(uri)) {
                        //空闲心跳
                        IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
                        return executorBiz.idleBeat(idleBeatParam);
                    } else if ("/run".equals(uri)) {
                        //执行job
                        TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
                        return executorBiz.run(triggerParam);
                    } else if ("/kill".equals(uri)) {
                        //kill job
                        KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
                        return executorBiz.kill(killParam);
                    } else if ("/log".equals(uri)) {
                        LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
                        return executorBiz.log(logParam);
                    } else {
                        return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
                    }
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
                }
            }
            ...
        }
    

    从上面可以看出,嵌入式HTTPserver 主要提供了心跳,空闲心跳,执行job,killjob,等REST操作。

    再来看实际的执行任务ExecutorBizImpl
    //ExecutorBizImpl

     public ReturnT<String> run(TriggerParam triggerParam) {
            // load old:jobHandler + jobThread ,从job任务执行器,加载job线程
            JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
            IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
            String removeOldReason = null;
    
            // valid:jobHandler + jobThread 校验 job线程和job处理器
            GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
            if (GlueTypeEnum.BEAN == glueTypeEnum) {
    
                // new jobhandler 创建执行器handler
                IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
    
                // valid old jobThread
                if (jobThread!=null && jobHandler != newJobHandler) {
                    // change handler, need kill old thread
                    removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
    
                    jobThread = null;
                    jobHandler = null;
                }
    
                // valid handler
                if (jobHandler == null) {
                    jobHandler = newJobHandler;
                    if (jobHandler == null) {
                        return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                    }
                }
    
            } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
    
                // valid old jobThread
                if (jobThread != null &&
                        !(jobThread.getHandler() instanceof GlueJobHandler
                            && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                    // change handler or gluesource updated, need kill old thread
                    removeOldReason = "change job source or glue type, and terminate the old job thread.";
    
                    jobThread = null;
                    jobHandler = null;
                }
    
                // valid handler
                if (jobHandler == null) {
                    try {
                        IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                        jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                        return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                    }
                }
            } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
    
                // valid old jobThread
                if (jobThread != null &&
                        !(jobThread.getHandler() instanceof ScriptJobHandler
                                && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                    // change script or gluesource updated, need kill old thread
                    removeOldReason = "change job source or glue type, and terminate the old job thread.";
    
                    jobThread = null;
                    jobHandler = null;
                }
    
                // valid handler
                if (jobHandler == null) {
                    jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
                }
            } else {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
            }
    
            // executor block strategy
            if (jobThread != null) {
                ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
                if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                    // discard when running
                    if (jobThread.isRunningOrHasQueue()) {
                        return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                    }
                } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                    // kill running jobThread
                    if (jobThread.isRunningOrHasQueue()) {
                        removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
    
                        jobThread = null;
                    }
                } else {
                    // just queue trigger
                }
            }
    
            // replace thread (new or exists invalid)
            if (jobThread == null) {
                //注册job到job执行器
                jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
            }
    
            // push data to queue, 推执行参数,到触发队列中
            ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
            return pushResult;
        }
    

    从上面可以,执行job实际上,先从任务集合加载任务未定job线程JobThread, 并将触发参数 推,job线程到触发队列中。

    再开看执行参数TriggerParam

    public class TriggerParam implements Serializable{
        private static final long serialVersionUID = 42L;
    
        private int jobId;
    
        /**
         * 执行处理器
         */
        private String executorHandler;
        /**
         * 执行参数
         */
        private String executorParams;
        private String executorBlockStrategy;
        /**
         * 执行器超时时间
         */
        private int executorTimeout;
    
        /**
         * job日志id
         */
        private long logId;
        private long logDateTime;
    
        private String glueType;
        private String glueSource;
        private long glueUpdatetime;
    
        /**
         * 分片任务广播索引
         */
        private int broadcastIndex;
        /**
         * 分片任务广播数量
         */
        private int broadcastTotal;
    ...
    }
    

    从上看可以看出执行参数TriggerParam,主要有,执行参数,执行策略,glue模式,分片任务广播索引和分片任务广播数量。

    再来job线程

    public class JobThread extends Thread{
        private static Logger logger = LoggerFactory.getLogger(JobThread.class);
    
        private int jobId;
        /**
         * job处理器
         */
        private IJobHandler handler;
        /**
         * 触发任务队列
         */
        private LinkedBlockingQueue<TriggerParam> triggerQueue;
        private Set<Long> triggerLogIdSet;      // avoid repeat trigger for the same TRIGGER_LOG_ID
    
        private volatile boolean toStop = false;
        private String stopReason;
    
        private boolean running = false;    // if running job
        private int idleTimes = 0;          // idel times
    
    
        public JobThread(int jobId, IJobHandler handler) {
            this.jobId = jobId;
            this.handler = handler;
            this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
            this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());
        
        /**
         * new trigger to queue
         *
         * @param triggerParam
         * @return
         */
        public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
            // avoid repeat
            if (triggerLogIdSet.contains(triggerParam.getLogId())) {
                logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
                return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
            }
    
            triggerLogIdSet.add(triggerParam.getLogId());
            triggerQueue.add(triggerParam);
            return ReturnT.SUCCESS;
    ...
        }
    

    执行线程的触发在哪里呢?回到XxlJobExecutor

    //XxlJobExecutor

      * 注册job执行器, 并启动job线程
         * @param jobId
         * @param handler
         * @param removeOldReason
         * @return
         */
        public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
            JobThread newJobThread = new JobThread(jobId, handler);
            newJobThread.start();
            logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
    
            JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);  // putIfAbsent | oh my god, map's put method return the old value!!!
            if (oldJobThread != null) {
                oldJobThread.toStop(removeOldReason);
                oldJobThread.interrupt();
            }
    
            return newJobThread;
        }
    

    再来看job线程的启动

    //JobThread

     @Override
        public void run() {
    
            // init
            try {
                handler.init();
            } catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
    
            // execute
            while(!toStop){
                running = false;
                idleTimes++;
    
                TriggerParam triggerParam = null;
                ReturnT<String> executeResult = null;
                try {
                    // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
                    //从job线程触发队列中,拉取触发参数
                    triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
                    if (triggerParam!=null) {
                        running = true;
                        idleTimes = 0;
                        //从日志id集合set中移除,相应的日志id
                        triggerLogIdSet.remove(triggerParam.getLogId());
    
                        // log filename, like "logPath/yyyy-MM-dd/9999.log"
                        String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
                        XxlJobFileAppender.contextHolder.set(logFileName);
                        ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
    
                        // execute
                        XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
    
                        if (triggerParam.getExecutorTimeout() > 0) {
                            // limit timeout, 如果超时时间没有
                            Thread futureThread = null;
                            try {
                                final TriggerParam triggerParamTmp = triggerParam;
                                FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
                                    @Override
                                    public ReturnT<String> call() throws Exception {
                                        return handler.execute(triggerParamTmp.getExecutorParams());
                                    }
                                });
                                futureThread = new Thread(futureTask);
                                futureThread.start();
                                //超时等待执行结果
                                executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
                            } catch (TimeoutException e) {
    
                                XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
                                XxlJobLogger.log(e);
    
                                executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
                            } finally {
                                futureThread.interrupt();
                            }
                        } else {
                            // just execute, 否则立即执行相应的job
                            executeResult = handler.execute(triggerParam.getExecutorParams());
                        }
    
                        if (executeResult == null) {
                            executeResult = IJobHandler.FAIL;
                        } else {
                            executeResult.setMsg(
                                    (executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
                                            ?executeResult.getMsg().substring(0, 50000).concat("...")
                                            :executeResult.getMsg());
                            executeResult.setContent(null); // limit obj size
                        }
                        XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
    
                    } else {
                        //触发执行器列表为空,则从执行器移除相应的job
                        if (idleTimes > 30) {
                            if(triggerQueue.size() == 0) {  // avoid concurrent trigger causes jobId-lost
                                XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                            }
                        }
                    }
                } catch (Throwable e) {
                    if (toStop) {
                        XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
                    }
    
                    StringWriter stringWriter = new StringWriter();
                    e.printStackTrace(new PrintWriter(stringWriter));
                    String errorMsg = stringWriter.toString();
                    executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
    
                    XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
                } finally {
                    if(triggerParam != null) {
                        // callback handler info
                        if (!toStop) {
                            // commonm 任务执行回调通知
                            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
                        } else {
                            // is killed
                            ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
                            TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
                        }
                    }
                }
            }
    
            // callback trigger request in queue, 执行器关闭,则丢弃相应的job
            while(triggerQueue !=null && triggerQueue.size()>0){
                TriggerParam triggerParam = triggerQueue.poll();
                if (triggerParam!=null) {
                    // is killed
                    ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
                    TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
                }
            }
    
            // destroy
            try {
                handler.destroy();
            } catch (Throwable e) {
                logger.error(e.getMessage(), e);
            }
    
            logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
        }
    

    从上面可以看出JobThread,执行过程为,不断从job线程触发队列中,拉取触发参数, 如果超时则直接执行,否则创建一个FutureTask,超时等待执行结果。至此,我们将控制的调度任务,和客户端执行器的任务执行分析完了,我们分析一下。

    总结

    任务模型

    主要模型,主要包括job分组,job注册器,job,job日志,job执行报告,及job调度锁等模型。

    任务调度的过程

    job调度器内部主要是有两个线程,一个是调度线程scheduleThread, 用于从数据库中筛选出未来5秒内需要调度的job,如果触发的话,通知job执行器,执行任务;
    如果还有到调度时间的放到ringData Map中,待ringThread线程,每秒从ringData Map中查出对应的任务,并通知job执行器,执行任。
    任务执行,实际上,根据路由策略,从任务分组中选择,执行服务器,通知任务服务器执行任务。针对分片任务,则通知所有的任务服务器。

    XxlJobSpringExecutor初始化,主要是将JobHandler注解方法级的job放到,执行器中;行器任务集实际为一个 ConcurrentMap<String, IJobHandler> ,MethodJobHandler包装的任务对象,方法级job,及job的初始化与销毁。

    启动XxlJobExecutor,实际上是启动 初始化job控制台客户端,开启日志清理线程,开启触发回调线程(任务执行,将任务执行完,放到回调线程,回调线程轮询回调队列,并通知客户端)
    ,启动一个基于netty嵌入式的http server,用于接收控制台调度的调度通知。

    嵌入式HTTPserver 主要提供了心跳,空闲心跳,执行job,killjob,等REST操作。

    执行job实际上,先从任务集合加载任务未定job线程JobThread, 并将触发参数 推,job线程到触发队列中

    执行参数TriggerParam,主要有,执行参数,执行策略,glue模式,分片任务广播索引和分片任务广播数量。

    JobThread,执行过程为,不断从job线程触发队列中,拉取触发参数, 如果超时则直接执行,否则创建一个FutureTask,超时等待执行结果。

    xxljob优劣势

    我们来分析xxljob优劣势。

    优点

    • 基于HTTP协议,具有跨平台的特性;

    缺点

    • 任务统一有控制进行调度;基于HTTP协议可能存在时间差;
    • 所有的任务线程都是单线程轮询调度;
    • 日志分小时切割,不好排查问题

    由于xxljob是轻量级的所以我们不需要请求太多,对于一般的应用体量不是很大的完全可以满足。

    有没有什么可以改进的呢,答案是有, 控制只负责job任务,执行服务器的统计,日志的手机,执行数据的统计。客户端使用Quartz调度从控制台拉取的任务,任务执行是,通过执行性机器列表,筛算执行的机型,当前执行策略,有事最终选择一台,针对多机选择一台的,我们可以使用分布锁来控制,
    锁根据出细粒度,分为任务同步互斥锁,和任务调度锁(每次调度时,根据时间戳生成),获取锁的,则执行任务。

    github xxl-job
    xxl-job
    github xxl-job Vt
    分布式任务调度框架

    相关文章

      网友评论

          本文标题:XXL-JOB分布式任务调度平台

          本文链接:https://www.haomeiwen.com/subject/wajlwktx.html