引言
当我们提起定时任务时,我们会想起,JDK的[Timer][]和定时调度框架[Quartz][], Quartz一般与Spring家族框架结合使用。Quartz的任务存储有两种形式,一种是存储的内存,但是当应用重启是,定时任务将会丢失,另一种方式为数据库存储,数据库存储任务,解决了应用重启引起的任务丢失问题。无论是Quartz单独使用,还是与Spring的集成,应用一旦启动,我们将无法修改定时任务,同时Quartz对分布式的定时任务调度存在局限性。淘宝分布式定时任务调度框架[TBschedule][]的出现解决了分布式调度的问题,然而TBschedule染了阿里开源的通病,开源后,无维护,无更新,文档粗糙。今天我们来看一个轻量级的分布式任务调度框架XXL_JOB
原文地址:https://donaldhan.github.io/xxl_job/2020/09/07/XXL-Job-framework-design.html
目录
usage
-
先添加执行器,会自动注册机器列表,如果配置了域名则直接找域名否则,直接注册应机器的IP+PORT,执行器支持集群部署,集群部署,要保证执行器的名称一致(app_name);
-
添加任务(隶属于执行器)。
xxl-job架构设计
xxl-job-framwork.png从上面可以看出,job管理中心,主要包括一下组件:
- 执行器注册监控(JobRegistryMonitorHelper):注册监控服务,将所有应用的存活服务器,写到响应的任务分组下
- 任务失败重试监控(JobFailMonitorHelper):失败重试,如果需要则告警
- 丢失任务监控器(JobLosedMonitorHelper)
执行器客户端主要包括一下组件:
- 日志清理
- 任务回调通知
- 嵌入式HTTPserver:主要提供了心跳,空闲心跳,执行job,killjob,等REST操作。
所有的通信都是http。
来看一下控制台的相关模型
控制台模型
#
# XXL-JOB v2.2.0
# Copyright (c) 2015-present, xuxueli.
CREATE database if NOT EXISTS `xxl_job` default character set utf8mb4 collate utf8mb4_unicode_ci;
use `xxl_job`;
SET NAMES utf8mb4;
-- job
CREATE TABLE `xxl_job_info` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '执行器主键ID',
`job_cron` varchar(128) NOT NULL COMMENT '任务执行CRON',
`job_desc` varchar(255) NOT NULL,
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`author` varchar(64) DEFAULT NULL COMMENT '作者',
`alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',
`executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
`executor_block_strategy` varchar(50) DEFAULT NULL COMMENT '阻塞处理策略',
`executor_timeout` int(11) NOT NULL DEFAULT '0' COMMENT '任务执行超时时间,单位秒',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
`glue_type` varchar(50) NOT NULL COMMENT 'GLUE类型',
`glue_source` mediumtext COMMENT 'GLUE源代码',
`glue_remark` varchar(128) DEFAULT NULL COMMENT 'GLUE备注',
`glue_updatetime` datetime DEFAULT NULL COMMENT 'GLUE更新时间',
`child_jobid` varchar(255) DEFAULT NULL COMMENT '子任务ID,多个逗号分隔',
`trigger_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '调度状态:0-停止,1-运行',
`trigger_last_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '上次调度时间',
`trigger_next_time` bigint(13) NOT NULL DEFAULT '0' COMMENT '下次调度时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- job执行日志
CREATE TABLE `xxl_job_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`job_group` int(11) NOT NULL COMMENT '执行器主键ID',
`job_id` int(11) NOT NULL COMMENT '任务,主键ID',
`executor_address` varchar(255) DEFAULT NULL COMMENT '执行器地址,本次执行的地址',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(512) DEFAULT NULL COMMENT '执行器任务参数',
`executor_sharding_param` varchar(20) DEFAULT NULL COMMENT '执行器任务分片参数,格式如 1/2',
`executor_fail_retry_count` int(11) NOT NULL DEFAULT '0' COMMENT '失败重试次数',
`trigger_time` datetime DEFAULT NULL COMMENT '调度-时间',
`trigger_code` int(11) NOT NULL COMMENT '调度-结果',
`trigger_msg` text COMMENT '调度-日志',
`handle_time` datetime DEFAULT NULL COMMENT '执行-时间',
`handle_code` int(11) NOT NULL COMMENT '执行-状态',
`handle_msg` text COMMENT '执行-日志',
`alarm_status` tinyint(4) NOT NULL DEFAULT '0' COMMENT '告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败',
PRIMARY KEY (`id`),
KEY `I_trigger_time` (`trigger_time`),
KEY `I_handle_code` (`handle_code`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- job执行报告
CREATE TABLE `xxl_job_log_report` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`trigger_day` datetime DEFAULT NULL COMMENT '调度-时间',
`running_count` int(11) NOT NULL DEFAULT '0' COMMENT '运行中-日志数量',
`suc_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行成功-日志数量',
`fail_count` int(11) NOT NULL DEFAULT '0' COMMENT '执行失败-日志数量',
PRIMARY KEY (`id`),
UNIQUE KEY `i_trigger_day` (`trigger_day`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
CREATE TABLE `xxl_job_logglue` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`job_id` int(11) NOT NULL COMMENT '任务,主键ID',
`glue_type` varchar(50) DEFAULT NULL COMMENT 'GLUE类型',
`glue_source` mediumtext COMMENT 'GLUE源代码',
`glue_remark` varchar(128) NOT NULL COMMENT 'GLUE备注',
`add_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- job执行器注册表,当客户端启动时,注册到相应的执行器分组下面,分组,应用名,执行服务器列表(ip:port, 域名)
CREATE TABLE `xxl_job_registry` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`registry_group` varchar(50) NOT NULL,
`registry_key` varchar(255) NOT NULL,
`registry_value` varchar(255) NOT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`),
KEY `i_g_k_v` (`registry_group`,`registry_key`,`registry_value`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- job执行器
CREATE TABLE `xxl_job_group` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`app_name` varchar(64) NOT NULL COMMENT '执行器AppName',
`title` varchar(12) NOT NULL COMMENT '执行器名称',
`address_type` tinyint(4) NOT NULL DEFAULT '0' COMMENT '执行器地址类型:0=自动注册、1=手动录入',
`address_list` varchar(512) DEFAULT NULL COMMENT '执行器地址列表,多地址逗号分隔',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- 用户表
CREATE TABLE `xxl_job_user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`username` varchar(50) NOT NULL COMMENT '账号',
`password` varchar(50) NOT NULL COMMENT '密码',
`role` tinyint(4) NOT NULL COMMENT '角色:0-普通用户、1-管理员',
`permission` varchar(255) DEFAULT NULL COMMENT '权限:执行器ID列表,多个逗号分割',
PRIMARY KEY (`id`),
UNIQUE KEY `i_username` (`username`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
-- job 锁表
CREATE TABLE `xxl_job_lock` (
`lock_name` varchar(50) NOT NULL COMMENT '锁名称',
PRIMARY KEY (`lock_name`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
INSERT INTO `xxl_job_group`(`id`, `app_name`, `title`, `address_type`, `address_list`) VALUES (1, 'xxl-job-executor-sample', '示例执行器', 0, NULL);
INSERT INTO `xxl_job_info`(`id`, `job_group`, `job_cron`, `job_desc`, `add_time`, `update_time`, `author`, `alarm_email`, `executor_route_strategy`, `executor_handler`, `executor_param`, `executor_block_strategy`, `executor_timeout`, `executor_fail_retry_count`, `glue_type`, `glue_source`, `glue_remark`, `glue_updatetime`, `child_jobid`) VALUES (1, 1, '0 0 0 * * ? *', '测试任务1', '2018-11-03 22:21:31', '2018-11-03 22:21:31', 'XXL', '', 'FIRST', 'demoJobHandler', '', 'SERIAL_EXECUTION', 0, 0, 'BEAN', '', 'GLUE代码初始化', '2018-11-03 22:21:31', '');
INSERT INTO `xxl_job_user`(`id`, `username`, `password`, `role`, `permission`) VALUES (1, 'admin', 'e10adc3949ba59abbe56e057f20f883e', 1, NULL);
INSERT INTO `xxl_job_lock` ( `lock_name`) VALUES ( 'schedule_lock');
commit;
从上面看出,主要模型,主要包括job分组,job注册器,job,job日志,job执行报告,及job调度锁等模型。
我们来看具体任务的调度执行。
任务调度(管理控制台)
启动任务调度器(JobScheduleHelper)
//JobScheduleHelper
/**
*
*/
public void start(){
// schedule thread
scheduleThread = new Thread(new Runnable() {
...
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// ring thread, 轮询在5s之内需要触发的任务
ringThread = new Thread(new Runnable() {
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
job调度器内部主要是有两个线程,一个是调度线程scheduleThread, 用于从数据库中筛选出未来5秒内需要调度的job,如果触发的话,通知job执行器,执行任务;
如果还有到调度时间的放到ringData Map中,待ringThread线程,每秒从ringData Map中查出对应的任务,并通知job执行器,执行任。
* 存储当前轮询批次内,需要调度的任务,key为时间秒,value为list<TaskId>
*/
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
//JobScheduleHelper
/**
*
*/
public void start(){
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
//调度线程睡眠5s
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
...
}
// 预读线程池数
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
//获取调度锁
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
//查询下次调度时间在跟定调度时间范围内的调度任务,提前5s
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// fresh next, job错过触发事件,刷新job下次触发执行时间
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger, 触发job
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next 刷新job下次触发执行时间
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again, 下次触发事件在5s之内需要触发
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second , 获取下次触发需要等的秒数
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring 放到
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info 更新任务触发信息
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
// tx stop
} catch (Exception e) {
...
} finally {
// commit
if (conn != null) {
try {
conn.commit();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.setAutoCommit(connAutoCommit);
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
conn.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
// close PreparedStatement
if (null != preparedStatement) {
try {
preparedStatement.close();
} catch (SQLException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
...
从上面可以看出,调度线程scheduleThread, 首先获取调度锁,从数据库中筛选出未来5秒内需要调度的job,如果触发的话,通知job执行器,执行任务;
再来看一下没有到触发时间的任务,处理策略
/**
* 下次需要轮询的任务集
* @param ringSecond
* @param jobId
*/
private void pushTimeRing(int ringSecond, int jobId){
// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
if (ringItemData == null) {
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(jobId);
logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
}
先放到待调度的任务集合中ringData; 待ring线程调度。
再来看ring线程
/**
*
*/
public void start(){
...
// ring thread, 轮询在5s之内需要触发的任务
ringThread = new Thread(new Runnable() {
@Override
public void run() {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
while (!ringThreadToStop) {
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
//取出当前时刻需要调度的任务
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger, 触发任务
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
// next second, align second
try {
//睡眠1秒钟
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
从上面可以看出,ring线程每秒从ringData Map中查出对应的任务,并通知job执行器,执行任。
我们来看一下关键的步骤, 通知job执行器,执行任务。
// do trigger, 触发任务
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
//JobTriggerPoolHelper
/**
* 触发job
* @param jobId
* @param triggerType
* @param failRetryCount
* >=0: use this param
* <0: use param from job info config
* @param executorShardingParam 分片参数 "index/total"
* @param executorParam
* null: use job param
* not null: cover job param
*/
public static void trigger(int jobId, TriggerTypeEnum triggerType, int failRetryCount, String executorShardingParam, String executorParam, String addressList) {
helper.addTrigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
/**
* add trigger
* 添加job触发器
*/
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
// do trigger 触发远程执行服务器,执行job
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
}
}
});
}
//XxlJobTrigger
/* trigger job
*
* @param jobId
* @param triggerType
* @param failRetryCount
* >=0: use this param
* <0: use param from job info config
* @param executorShardingParam
* @param executorParam
* null: use job param
* not null: cover job param
* @param addressList
* null: use executor addressList
* not null: cover
*/
public static void trigger(int jobId,
TriggerTypeEnum triggerType,
int failRetryCount,
String executorShardingParam,
String executorParam,
String addressList) {
// load data 加载任务数据
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
if (jobInfo == null) {
logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
return;
}
if (executorParam != null) {
jobInfo.setExecutorParam(executorParam);
}
...
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
//分片广播模式,且分片参数为空
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
} else {
if (shardingParam == null) {
shardingParam = new int[]{0, 1};
}
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
}
}
/**
* 处理任务触发器
* @param group job group, registry list may be empty
* @param jobInfo
* @param finalFailRetryCount
* @param triggerType
* @param index sharding index
* @param total sharding index
*/
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
...
// 4、trigger remote executor 触发远程执行器
ReturnT<String> triggerResult = null;
if (address != null) {
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
...
}
/**
* run executor
* @param triggerParam
* @param address
* @return
*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
//获取客户端执行器
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
//执行执行器
runResult = executorBiz.run(triggerParam);
}
}
//XxlJobScheduler
/**
* 获取给定地址的执行器,存在则直接从缓存加载,否则放到执行器客户端池
* @param address
* @return
* @throws Exception
*/
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address==null || address.trim().length()==0) {
return null;
}
// load-cache
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}
// set-cache
executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
//ExecutorBizClient
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
从上面可以看出, 任务执行,实际上,根据路由策略,从任务分组中选择,执行服务器,通知任务服务器执行任务。针对分片任务,则通知所有的任务服务器。
再看看一下任务执行器
执行器
在客户端的时候,一般我们会启用如下配置
@Configuration
public class XxlJobConfig {
@Value("${xxl.job.admin.addresses}")
private String adminAddresses;
@Value("${xxl.job.accessToken}")
private String accessToken;
@Value("${xxl.job.executor.appname}")
private String appname;
@Value("${xxl.job.executor.address}")
private String address;
@Value("${xxl.job.executor.ip}")
private String ip;
@Value("${xxl.job.executor.port}")
private int port;
@Value("${xxl.job.executor.logpath}")
private String logPath;
@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
...
}
来看XxlJobSpringExecutor的初始化
public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {
private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);
/**
*
*/
// start
@Override
public void afterSingletonsInstantiated() {
// init JobHandler Repository 老版本使用
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method) 初始化方法级job
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory,刷新GLUE工厂类型
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
...
}
XxlJobSpringExecutor初始化,主要是将JobHandler注解方法级的job放到,执行器中;
我们来简单看一下
//XxlJobSpringExecutor
/**
* 初始化方法级job
* @param applicationContext
*/
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// init job handler from method
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
// referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
//扫描应用上下文中bean的XxlJob注解方法
Map<Method, XxlJob> annotatedMethods = null;
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method method = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
if (xxlJob == null) {
continue;
}
//job名称
String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
//check是否存在同名的job
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
// execute method, 参数检查,只能有可一个参数,并且为Strng类型
if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
//检查返回值类型
if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
method.setAccessible(true);
// init and destory , 初始化job,init和销毁方法
Method initMethod = null;
Method destroyMethod = null;
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
}
// registry jobhandler, 注册job处理器
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
}
}
}
// ---------------------- job handler repository ----------------------
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
/**
* 注册job处理器
* @param name
* @param jobHandler
* @return
*/
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
从上面可以看出,执行器任务集实际为一个 ConcurrentMap<String, IJobHandler>;
简单看一下方法级任务
public class MethodJobHandler extends IJobHandler {
/**
* job类
*/
private final Object target;
/**
* job方法
*/
private final Method method;
/**
* job初始化方法
*/
private Method initMethod;
/**
* job销毁方法
*/
private Method destroyMethod;
...
}
从上面可以看出MethodJobHandler包装的任务对象,方法级job,及job的初始化与销毁。
再来卡执行器实际的启动情况
//XxlJobExecutor
public class XxlJobExecutor {
/**
* 启动job执行器
* @throws Exception
*/
public void start() throws Exception {
// init logpath
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client 初始化job控制台客户端
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread 开启日志清理线程
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread 开启触发回调线程
TriggerCallbackThread.getInstance().start();
// init executor-server 启动执行器server
initEmbedServer(address, ip, port, appname, accessToken);
}
...
}
从上面可以看出,启动XxlJobExecutor,实际上是启动 初始化job控制台客户端,开启日志清理线程,开启触发回调线程(任务执行,将任务执行完,放到回调线程,回调线程轮询回调队列,并通知客户端)
,启动一个基于netty嵌入式的http server,用于接收控制台调度的调度通知。
//XxlJobExecutor
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// fill ip port
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
// generate address
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
//EmbedServer
/**
* Copy from : https://github.com/xuxueli/xxl-rpc
* 基于netty的嵌入式客户端
* @author xuxueli 2020-04-11 21:25
*/
public class EmbedServer {
private static final Logger logger = LoggerFactory.getLogger(EmbedServer.class);
private ExecutorBiz executorBiz;
private Thread thread;
/**
* 启动执行器server
* @param address
* @param port
* @param appname
* @param accessToken
*/
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {
@Override
public void run() {
// param
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
0,
200,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());
}
},
new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});
try {
// start server 启动netty http server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// start registry, 启动任务执行注册器线程
startRegistry(appname, address);
// wait util stop
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} else {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
}
} finally {
// stop
try {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}
关键看EmbedHttpServerHandler
//EmbedHttpServerHandler
/**
* netty_http
*
* Copy from : https://github.com/xuxueli/xxl-rpc
*
* @author xuxueli 2015-11-24 22:25:15
*/
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
private static final Logger logger = LoggerFactory.getLogger(EmbedHttpServerHandler.class);
private ExecutorBiz executorBiz;
private String accessToken;
private ThreadPoolExecutor bizThreadPool;
public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
this.executorBiz = executorBiz;
this.accessToken = accessToken;
this.bizThreadPool = bizThreadPool;
}
@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
// request parse
//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
String requestData = msg.content().toString(CharsetUtil.UTF_8);
String uri = msg.uri();
HttpMethod httpMethod = msg.method();
boolean keepAlive = HttpUtil.isKeepAlive(msg);
String accessTokenReq = msg.headers().get(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN);
// invoke
bizThreadPool.execute(new Runnable() {
@Override
public void run() {
// do invoke 处理http请求
Object responseObj = process(httpMethod, uri, requestData, accessTokenReq);
// to json
String responseJson = GsonTool.toJson(responseObj);
// write response
writeResponse(ctx, keepAlive, responseJson);
}
});
}
/**
* 处理admin http请求
* @param httpMethod
* @param uri
* @param requestData
* @param accessTokenReq
* @return
*/
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (accessToken!=null
&& accessToken.trim().length()>0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
try {
if ("/beat".equals(uri)) {
//心跳
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
//空闲心跳
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
//执行job
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
//kill job
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
}
...
}
从上面可以看出,嵌入式HTTPserver 主要提供了心跳,空闲心跳,执行job,killjob,等REST操作。
再来看实际的执行任务ExecutorBizImpl
//ExecutorBizImpl
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread ,从job任务执行器,加载job线程
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread 校验 job线程和job处理器
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobhandler 创建执行器handler
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof ScriptJobHandler
&& ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change script or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
}
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// executor block strategy
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// replace thread (new or exists invalid)
if (jobThread == null) {
//注册job到job执行器
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// push data to queue, 推执行参数,到触发队列中
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
从上面可以,执行job实际上,先从任务集合加载任务未定job线程JobThread, 并将触发参数 推,job线程到触发队列中。
再开看执行参数TriggerParam
public class TriggerParam implements Serializable{
private static final long serialVersionUID = 42L;
private int jobId;
/**
* 执行处理器
*/
private String executorHandler;
/**
* 执行参数
*/
private String executorParams;
private String executorBlockStrategy;
/**
* 执行器超时时间
*/
private int executorTimeout;
/**
* job日志id
*/
private long logId;
private long logDateTime;
private String glueType;
private String glueSource;
private long glueUpdatetime;
/**
* 分片任务广播索引
*/
private int broadcastIndex;
/**
* 分片任务广播数量
*/
private int broadcastTotal;
...
}
从上看可以看出执行参数TriggerParam,主要有,执行参数,执行策略,glue模式,分片任务广播索引和分片任务广播数量。
再来job线程
public class JobThread extends Thread{
private static Logger logger = LoggerFactory.getLogger(JobThread.class);
private int jobId;
/**
* job处理器
*/
private IJobHandler handler;
/**
* 触发任务队列
*/
private LinkedBlockingQueue<TriggerParam> triggerQueue;
private Set<Long> triggerLogIdSet; // avoid repeat trigger for the same TRIGGER_LOG_ID
private volatile boolean toStop = false;
private String stopReason;
private boolean running = false; // if running job
private int idleTimes = 0; // idel times
public JobThread(int jobId, IJobHandler handler) {
this.jobId = jobId;
this.handler = handler;
this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
this.triggerLogIdSet = Collections.synchronizedSet(new HashSet<Long>());
/**
* new trigger to queue
*
* @param triggerParam
* @return
*/
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
// avoid repeat
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}
triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
...
}
执行线程的触发在哪里呢?回到XxlJobExecutor
//XxlJobExecutor
* 注册job执行器, 并启动job线程
* @param jobId
* @param handler
* @param removeOldReason
* @return
*/
public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
JobThread newJobThread = new JobThread(jobId, handler);
newJobThread.start();
logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});
JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!!
if (oldJobThread != null) {
oldJobThread.toStop(removeOldReason);
oldJobThread.interrupt();
}
return newJobThread;
}
再来看job线程的启动
//JobThread
@Override
public void run() {
// init
try {
handler.init();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// execute
while(!toStop){
running = false;
idleTimes++;
TriggerParam triggerParam = null;
ReturnT<String> executeResult = null;
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
//从job线程触发队列中,拉取触发参数
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
if (triggerParam!=null) {
running = true;
idleTimes = 0;
//从日志id集合set中移除,相应的日志id
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
XxlJobFileAppender.contextHolder.set(logFileName);
ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal()));
// execute
XxlJobLogger.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + triggerParam.getExecutorParams());
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout, 如果超时时间没有
Thread futureThread = null;
try {
final TriggerParam triggerParamTmp = triggerParam;
FutureTask<ReturnT<String>> futureTask = new FutureTask<ReturnT<String>>(new Callable<ReturnT<String>>() {
@Override
public ReturnT<String> call() throws Exception {
return handler.execute(triggerParamTmp.getExecutorParams());
}
});
futureThread = new Thread(futureTask);
futureThread.start();
//超时等待执行结果
executeResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
XxlJobLogger.log("<br>----------- xxl-job job execute timeout");
XxlJobLogger.log(e);
executeResult = new ReturnT<String>(IJobHandler.FAIL_TIMEOUT.getCode(), "job execute timeout ");
} finally {
futureThread.interrupt();
}
} else {
// just execute, 否则立即执行相应的job
executeResult = handler.execute(triggerParam.getExecutorParams());
}
if (executeResult == null) {
executeResult = IJobHandler.FAIL;
} else {
executeResult.setMsg(
(executeResult!=null&&executeResult.getMsg()!=null&&executeResult.getMsg().length()>50000)
?executeResult.getMsg().substring(0, 50000).concat("...")
:executeResult.getMsg());
executeResult.setContent(null); // limit obj size
}
XxlJobLogger.log("<br>----------- xxl-job job execute end(finish) -----------<br>----------- ReturnT:" + executeResult);
} else {
//触发执行器列表为空,则从执行器移除相应的job
if (idleTimes > 30) {
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
}
}
}
} catch (Throwable e) {
if (toStop) {
XxlJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
}
StringWriter stringWriter = new StringWriter();
e.printStackTrace(new PrintWriter(stringWriter));
String errorMsg = stringWriter.toString();
executeResult = new ReturnT<String>(ReturnT.FAIL_CODE, errorMsg);
XxlJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- xxl-job job execute end(error) -----------");
} finally {
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm 任务执行回调通知
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}
}
}
// callback trigger request in queue, 执行器关闭,则丢弃相应的job
while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job not executed, in the job queue, killed.]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}
// destroy
try {
handler.destroy();
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread());
}
从上面可以看出JobThread,执行过程为,不断从job线程触发队列中,拉取触发参数, 如果超时则直接执行,否则创建一个FutureTask,超时等待执行结果。至此,我们将控制的调度任务,和客户端执行器的任务执行分析完了,我们分析一下。
总结
任务模型
主要模型,主要包括job分组,job注册器,job,job日志,job执行报告,及job调度锁等模型。
任务调度的过程
job调度器内部主要是有两个线程,一个是调度线程scheduleThread, 用于从数据库中筛选出未来5秒内需要调度的job,如果触发的话,通知job执行器,执行任务;
如果还有到调度时间的放到ringData Map中,待ringThread线程,每秒从ringData Map中查出对应的任务,并通知job执行器,执行任。
任务执行,实际上,根据路由策略,从任务分组中选择,执行服务器,通知任务服务器执行任务。针对分片任务,则通知所有的任务服务器。
XxlJobSpringExecutor初始化,主要是将JobHandler注解方法级的job放到,执行器中;行器任务集实际为一个 ConcurrentMap<String, IJobHandler> ,MethodJobHandler包装的任务对象,方法级job,及job的初始化与销毁。
启动XxlJobExecutor,实际上是启动 初始化job控制台客户端,开启日志清理线程,开启触发回调线程(任务执行,将任务执行完,放到回调线程,回调线程轮询回调队列,并通知客户端)
,启动一个基于netty嵌入式的http server,用于接收控制台调度的调度通知。
嵌入式HTTPserver 主要提供了心跳,空闲心跳,执行job,killjob,等REST操作。
执行job实际上,先从任务集合加载任务未定job线程JobThread, 并将触发参数 推,job线程到触发队列中
执行参数TriggerParam,主要有,执行参数,执行策略,glue模式,分片任务广播索引和分片任务广播数量。
JobThread,执行过程为,不断从job线程触发队列中,拉取触发参数, 如果超时则直接执行,否则创建一个FutureTask,超时等待执行结果。
xxljob优劣势
我们来分析xxljob优劣势。
优点
- 基于HTTP协议,具有跨平台的特性;
缺点
- 任务统一有控制进行调度;基于HTTP协议可能存在时间差;
- 所有的任务线程都是单线程轮询调度;
- 日志分小时切割,不好排查问题
由于xxljob是轻量级的所以我们不需要请求太多,对于一般的应用体量不是很大的完全可以满足。
有没有什么可以改进的呢,答案是有, 控制只负责job任务,执行服务器的统计,日志的手机,执行数据的统计。客户端使用Quartz调度从控制台拉取的任务,任务执行是,通过执行性机器列表,筛算执行的机型,当前执行策略,有事最终选择一台,针对多机选择一台的,我们可以使用分布锁来控制,
锁根据出细粒度,分为任务同步互斥锁,和任务调度锁(每次调度时,根据时间戳生成),获取锁的,则执行任务。
网友评论