XXL-JOB分为B/S架构,调用中心是XXL-JOB服务端,执行器是客户端。
- 调度中心核心功能:执行器管理、任务管理、任务调度、监控告警和故障转移
- 执行器核心功能:负责业务任务处理,不关心任务调度
XXL-JOB将任务调度和任务执行隔离,将任务调度和执行进行解耦,让研发人员只关注业务部分,提高搞开发效率和系统扩展性。
源码工程结构
- /xxl-job-admin :调度中心,项目源码,服务端
- /xxl-job-executor-samples :执行器,Sample示例项目
- /xxl-job-core :公共Jar依赖
数据库表说明
- xxl_job_lock:任务调度锁表;
- xxl_job_group:执行器信息表,维护任务执行器信息;
- xxl_job_info:调度扩展信息表: 用于保存XXL-JOB调度任务的扩展信息,如任务分组、任务名、机器地址、执行器、执行入参和报警邮件等等;
- xxl_job_log:调度日志表: 用于保存XXL-JOB任务调度的历史信息,如调度结果、执行结果、调度入参、调度机器和执行器等等;
- xxl_job_log_report:调度日志报表:用户存储XXL-JOB任务调度日志的报表,调度中心报表功能页面会用到;
- xxl_job_logglue:任务GLUE日志:用于保存GLUE更新历史,用于支持GLUE的版本回溯功能;
- xxl_job_registry:执行器注册表,维护在线的执行器和调度中心机器地址信息;
- xxl_job_user:系统用户表;
调度中心配置入口
调度中心是基于Spring Boot开发,当XxlJobAdminConfig
配置类加载完成后,会通过afterPropertiesSet
方法进行XXL-Job的初始化。
@Component
public class XxlJobAdminConfig implements InitializingBean, DisposableBean {
...
private XxlJobScheduler xxlJobScheduler;
@Override
public void afterPropertiesSet() throws Exception {
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
// 任务初始化入口
xxlJobScheduler.init();
}
...
}
public class XxlJobScheduler {
private static final Logger logger = LoggerFactory.getLogger(XxlJobScheduler.class);
public void init() throws Exception {
// init i18n 国际化
initI18n();
// admin trigger pool start 触发器线程池初始化 快速触发器最小200个线程,慢速触发器线程池最小100个线程
JobTriggerPoolHelper.toStart();
// admin registry monitor run 初始化注册中心线程池,处理执行器注册和销毁
JobRegistryHelper.getInstance().start();
// admin fail-monitor run 初始化任务故障监控的守护线程,负责任务重试和发送告警信息
JobFailMonitorHelper.getInstance().start();
// admin lose-monitor run ( depend on JobTriggerPoolHelper )
// 初始化callback线程池和监控任务执行失败的守护线程,【任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;】
JobCompleteHelper.getInstance().start();
// 日志清理和计算任务执行统计数据守护线程,用于报表计算
JobLogReportHelper.getInstance().start();
// 任务调度守护线程
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
public void destroy() throws Exception {
... 销毁上诉线程资源
}
}
XxlJobAdminConfig
会初始化一系列的守护线程或者线程池来完成一些异步任务:
- JobTriggerPoolHelper:触发器线程池初始化,快速任务触发器最小200个线程,慢速任务触发器线程池最小100个线程,任务评价执行时间大于10分钟为被定义为慢速任务
- JobRegistryHelper:初始化注册中心线程池,处理执行器注册和销毁
- JobFailMonitorHelper:初始化任务故障监控的守护线程,负责任务重试和发送告警信息
- JobCompleteHelper:初始化callback线程池和监控任务执行失败的守护线程,任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
- JobLogReportHelper:日志清理和计算任务执行统计数据守护线程,用于报表计算
- JobScheduleHelper:任务调度守护线程
任务调度 JobScheduleHelper
时间轮
任务调度采用时间环数据结构结合数据库中的任务信息,实现了任务的周期性调度和触发,时间环数据结构用于存储在特定时间需要触发的任务ID。在XXL-JOB中会存储一分钟内的任务,将一分钟划分为60秒,每一秒是一个滑动窗口,存放当前秒需要执行的所有任务。
image.png
XXL-JOB
中使用ConcurrentHashMap
来实现时间轮,key
是根据任务执行时间计算出来的时间窗口位置,Value
是xxl_job_info
表中的jobId
。key
的计算方式为:
// 时间轮容器
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
// 使用执行时间的时间戳除以1000得到秒数,在和60取余数得到时间窗口的下标位置。
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
private void pushTimeRing(int ringSecond, int jobId){
// 将任务推送到异步环中,Value是一个ArrayList
List<Integer> ringItemData = ringData.get(ringSecond);
if (ringItemData == null) {
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(jobId);
}
调度器初始化
JobScheduleHelper
初始化完成后会初始化两个守护进程完成任务调度。
-
scheduleThread
:守护线程负责从数据库读取任务信息,将需要调度的任务ID加载到时间环中。 -
ringThread
:守护线程会周期性地检查时间环,负责触发对应的任务。
public class JobScheduleHelper {
public static final long PRE_READ_MS = 5000; // pre read
private Thread scheduleThread;
private Thread ringThread;
private volatile boolean scheduleThreadToStop = false;
private volatile boolean ringThreadToStop = false;
private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();
public void start(){
// schedule thread 负责从数据库读取任务信息,将需要调度的任务ID加载到时间环中。
scheduleThread = new Thread(new Runnable() {...});
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// ring thread 负责触发对应的任务
ringThread = new Thread(new Runnable() {...});
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
}
任务加载scheduleThread
程
任务扫描
调度中心为了任务的平均分配,触发组件每次获取与线程池数量相关数量的任务,避免大量任务集中在单个调度中心集群节点进行调度,这就是XXL-JOB常说的默认只支持6000个任务的调度;
public static final long PRE_READ_MS = 5000;
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
<select id="scheduleJobQuery" parameterType="java.util.HashMap" resultMap="XxlJobInfo">
SELECT <include refid="Base_Column_List" />
FROM xxl_job_info AS t
WHERE t.trigger_status = 1
and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}
ORDER BY id ASC
LIMIT #{pagesize}
</select>
-
maxNextTime
:预读时间 -
pagesize
:拉取任务总数
从这里可以看出,每次任务加载线程会加载5秒以内将会执行的任务,每次加载preReadCount
个任务到时间轮,preReadCount
默认情况下是6000
,计算公式为preReadCount = (快线程数+慢线程数) * 20
。
## xxl-job, triggerpool max size
# 快线程数配置
xxl.job.triggerpool.fast.max=200
# 慢线程数配置
xxl.job.triggerpool.slow.max=100
这里为啥要乘以20呢?因为进过作者测试,发现大多数定时任务的触发耗时都在 50ms 以内,那么1000/50=20
,可以得到任务触发器(JobTriggerPoolHelper
)qps
为 20
,所以同一时刻,任务触发器可以处理的任务为(200+100)*20=6000
。
任务加载到时间轮
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump PRE_READ_MS = 5秒
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
...
} else if (nowTime > jobInfo.getTriggerNextTime()) {
...
} else {
...
}
}
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
任务扫描出来后,需要将任务放到时间轮中进行统一触发执行,但是这里的任务并不是所有都需要进行压轮操作,扫描出来的任务被分为三块:
- 任务已经超时且超时时间大于5秒
- 任务已经超时且超时时间在5秒内
- 任务未超时
任务已经超时且超时时间大于5秒
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、获取配置的过期策略
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// 如果过期策略是立即执行,则立即触发任务,否则丢弃任务
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
}
// 2、刷新下一次任务执行时间
refreshNextValidTime(jobInfo, new Date());
}
处理流程:
- 获取配置的过期策略
- 如果过期策略是立即执行,则立即触发任务,否则丢弃任务
- 刷新下一次任务执行时间
调度过期策略是在任务配置的时候配置的,默认是忽略
image.png
任务已经超时且超时时间在5秒内
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 1、如果超时是5秒内的任务,则立即触发一次
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
// 2、设置下次执行时间
refreshNextValidTime(jobInfo, new Date());
// 如果5秒内会被执行则直接将下一次需要执行的任务做压轮操作
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
已经超时且超时时间在5秒内的任务会立即触发一次,如果下一次执行时间是在5秒以内,那么直接进行添加到时间轮。
任务未超时
} else {
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
未超时的任务直接放到时间轮中。
可用性和可靠性设计
并发控制
XXL-JOB使用数据库做的分布式锁,实现并发控制。由于调度中心是开启单独一个守护线程来进行任务调度,所以使用数据库进行并发控制性能上也没问题,因为调度中心部署节点并不多,锁的竞争压力不大。
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
通过分布式锁进行并发控制后就保证了同一时刻只能有一台服务器进行任务触发,所以集群部署并不能提升任务触发的并发数(性能提升),只能提升系统的可用性和可靠性。如果要提升系统性能只能增加单机配置,然后调整执行线程数进行扩容。
休眠设计
为了防止while循环一直轮训查数据库,任务每次执行完后会进行一定时间的休息:
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
// 散列集群部署后任务执行时机,降低资源竞争
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {...}
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
... 任务加载程序
long cost = System.currentTimeMillis()-start;
// 当加载任务执行耗时小于一秒才休眠,否则不休
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {...}
}
}
}
});
-
preReadSuc
:是一个boolean值,当有查询出了可调度的定时任务时,值为true,反之为false。 - 当加载任务执行耗时小于一秒才休眠,否则不休
- 如果本次任务查询出任务则休眠
1s
,否则休眠5s
-
System.currentTimeMillis()%1000)
散列休眠时间
重试设计
如果我们配置了任务失败后重试,那么将会进行从试
image.png
任务触发scheduleThread
线程
scheduleThread
线程也是一个守护线程,他负责在时间轮中取出数据,进行任务触发。
// ring thread
ringThread = new Thread(new Runnable() {
@Override
public void run() {
while (!ringThreadToStop) {
// 每次执行完休息1秒
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// 1. 获取需要处理的任务
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
for (int i = 0; i < 2; i++) {
// 避免处理耗时太长,跨过刻度,向前校验一个刻度,如果前一个刻度已经处理则不会有数据,因为已经被清空了
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// 2. 触发任务
if (ringItemData.size() > 0) {
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {...}
}
}
});
}
这里主要分两步:
- 通过当前时间戳获取需要执行的任务,这里同时取出当前刻度和前一个刻度的数据,防止处理耗时太长,跨过刻度的情况发生,如果前一个刻度已经处理则不会有数据,因为已经被清空了,所以任务不会重复执行
- 通过执行器
JobTriggerPoolHelper
异步执行任务。
任务执行 JobTriggerPoolHelper
JobTriggerPoolHelper
初始化
JobTriggerPoolHelper
初始化时会初始化快速任务线程池和慢速任务线程池,在1分钟内超时次数超过10次的任务定义为慢任务,这个统计使用的是1分钟的滑动窗口,每次调度大于500ms算超时,使用慢速线程池,否则使用快速线程池处理任务。,任务的执行通过JobTriggerPoolHelper.triggerPool
线程池处理。
-
fastTriggerPool
:快速任务处理线程池,最小200 -
slowTriggerPool
:慢速任务处理线程池,最小100
这里有个比较好的设计点,通过统计识别任务类型,通过线程池隔离将风险进行隔离,尽量保证快速任务的正常执行。
public class JobTriggerPoolHelper {
private ThreadPoolExecutor fastTriggerPool = null;
private ThreadPoolExecutor slowTriggerPool = null;
public void start(){
// 快速任务处理线程池,最小200
fastTriggerPool = new ThreadPoolExecutor(...);
// 慢速速任务处理线程池,最小100
slowTriggerPool = new ThreadPoolExecutor(...);
}
/**
* add trigger
*/
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
// 在1分钟内超时次数超过10次的任务定义为慢任务,使用慢速线程池
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// trigger
triggerPool_.execute(new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
try {
// 触发任务执行
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {...}
}
});
}
...
}
远程调用执行器
最后通过ExecutorBizClient.run
完成对执行器的调用, 通过源码可以发现就是使用的Http工具发起对执行器服务器中的/run
进行调用,完对任务的触发。
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
public static ReturnT postBody(String url, String accessToken, int timeout, Object requestObj, Class returnTargClassOfT) {
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();
...
// parse returnT
try {
ReturnT returnT = GsonTool.fromJson(resultJson, ReturnT.class, returnTargClassOfT);
return returnT;
} catch (Exception e) {...}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "xxl-job remoting error("+ e.getMessage() +"), for url : " + url);
} finally {...}
}
通过Postman发起本地Job调试
通过看到这里我们可以发现,我们的XXL-JOB在执行器上的任务可以直接通过Posm直接进行触发,curl脚本如下:
curl --location 'http://localhost:9999/run' \
--header 'Content-Type: application/json' \
--data '{
"jobId": 1,
"executorHandler": "p2-aquarius-reference-import-bill",
"executorBlockStrategy": "COVER_EARLY",
"executorTimeout": 0,
"logId": 1,
"logDateTime": 1710919760260,
"glueType": "BEAN",
"glueUpdatetime": 1710919760260,
"broadcastIndex": 0,
"broadcastTotal": 0,
"executorParams": "00:00:00"
}'
阻塞策略&路由策略
在任务触发过程中有两点需要注意阻塞策略和路由策略。
路由策略
image.png- FIRST(第一个):固定选择第一个机器执行。
- LAST(最后一个):固定选择最后一个机器执行。
- ROUND(轮询):按照执行器注册地址轮询分配任务。
- RANDOM(随机):随机选择在线的机器执行任务。
- CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
- 最不经常使用(LEAST_FREQUENTLY_USED):优先选择使用频率最低的那台机器。
- 最近最久未使用(LEAST_RECENTLY_USED):基于机器的使用时间进行选择,优先选择最近最久未使用的机器。
- 故障转移(FAILOVER):当首选机器出现故障时,任务会自动转移到其他可用的机器上执行。
- 忙碌转移(BUSYOVER):当首选机器处于忙碌状态时,任务会转移到其他机器上执行。
- 分片广播:这是一种针对执行时间长的任务的策略,通过将任务分散到各个节点上执行,加快完成速度。
分片广播是选择所有机器一起执行任务,其他方式都是选择其中一台执行任务。
阻塞策略
image.png- 单机串行(默认):当调度请求进入单机执行器后,它们会进入FIFO队列并以串行方式运行。这种策略按照顺序执行任务,不会并行处理多个任务。
- 丢弃后续调度:当调度请求进入单机执行器时,如果发现执行器已经存在正在运行的调度任务,本次请求将会被丢弃并标记为失败。这种策略会优先处理当前任务,而忽略后续到达的任务。
- 覆盖之前调度:当调度请求进入单机执行器时,如果发现执行器存在运行的调度任务,该策略会终止正在运行的调度任务并清空队列,然后运行新的本地调度任务。
调度器和任务执行器核心流程
image.png监控告警 JobFailMonitorHelper
监控告警线程初始化
public class JobFailMonitorHelper {
public void start(){
monitorThread = new Thread(new Runnable() {
@Override
public void run() {
// monitor
while (!toStop) {
try {
// 循环扫描```xxl_job_log```日志表,找出失败的任务进行告警通知
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// 通过数据库加锁
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
// 找到任务
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、失败重试
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、失败告警
int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
if (info != null) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
} catch (Exception e) {...}
try {
// 每次扫描间隔10秒
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {...}
}
}
});
monitorThread.setDaemon(true);
monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
monitorThread.start();
}
}
在服务启动时JobFailMonitorHelper
会初始化一个守护线程,循环扫描xxl_job_log
日志表,找出失败的任务进行重试或告警通知。
告警扩展
public boolean alarm(XxlJobInfo info, XxlJobLog jobLog) {
boolean result = false;
if (jobAlarmList!=null && jobAlarmList.size()>0) {
result = true; // success means all-success
for (JobAlarm alarm: jobAlarmList) {
boolean resultItem = false;
try {
resultItem = alarm.doAlarm(info, jobLog);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (!resultItem) {
result = false;
}
}
}
return result;
}
可以看到可以通过实现JobAlarm
接口实现对监控告警的扩展。
总结
- XXLJOB通过时间环数据结构来解决实时调度问题,减小调度误差
- XXLJOB通过异步化、多线程等方式来保证系统性能
- XXLJOB通过集群部署、并发控制、重试和监控告警的等方式来保证系统可用性和可靠性
- 通过类似责任链的方式来解决告警模块扩展性
- 通过提供完整的后台dashboard的方式来解决系统可维护性和可观测性
网友评论