近期花了一些时间翻了xxl-job
的源码,稍作分析,希望能从如此成熟的框架中洞悉一些分布式任务调度的本质。
本文的行文包括如下几点:
- 梳理出整个核心设计图,从骨架上俯瞰xxl-job的架构设计
- 梳理服务端核心设计,建立服务端设计的核心思路
- 梳理客户端核心设计,建立客户端设计的核心思路
核心设计图
image.png服务端设计
从XxlJobScheduler
中可以窥视服务端设计的最核心的内容,包括如下:
- 触发器线程池(任务调度执行)
- 服务注册与注销
- 任务执行异常补偿机制
- 客户端异常下线处理、回调处理
- 任务执行统计报表
- 任务执行定时任务(生成任务下一次执行时间)
触发器
设计上分为两个线程池,fastTriggerPool
与slowTriggerPool
,二者唯一的不同就是blockqueue
队列的大小不一样,fastTriggerPool
是1000,slowTriggerPool
是2000
public void addTrigger(final int jobId,
final TriggerTypeEnum triggerType,
final int failRetryCount,
final String executorShardingParam,
final String executorParam,
final String addressList) {
// choose thread pool
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
// ....
}
注释中提示,当一分钟内任务执行发生timeout的次数超过10次,任务将会被投入慢触发器线程池,但是此处提及的timeout可不是指http执行的超时时间,指的是任务执行的时间超过500ms。
long minTim_now = System.currentTimeMillis()/60000;
if (minTim != minTim_now) {
minTim = minTim_now;
jobTimeoutCountMap.clear();
}
// incr timeout-count-map
long cost = System.currentTimeMillis()-start;
if (cost > 500) { // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
每次任务执行完成,将会判断当前时间是否等于上一次执行的最小时间(通过当前时间戳除以60000来粗糙的表示1分钟的间距)
当任务执行时间超过500ms,则记录超时次数。(500ms还不是可以配置的参数,不得不说有点粗糙……)
服务注册与注销
registryOrRemoveThreadPool
线程池是用来处理接收来自客户端的注册与注销的处理任务,比较特殊的是线程池的拒绝策略是r.run()
,意味着将会由客户端请求的线程池来完成执行。(xxl-job
相当多的线程池都是使用该策略)
registryMonitorThread
是定时任务,执行时间间隔为30s,主要工作如下:
- 检查是否有自动注册的调度组(执行器),如果有,则如下处理
- 查询超过90s没有上报注册的客户端并删除
- 查询正常上报的客户端(注册时间在90s内),并将注册的IP地址更新到调度组
// auto registry group
List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
if (groupList!=null && !groupList.isEmpty()) {
// remove dead address (admin/executor)
List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
if (ids!=null && ids.size()>0) {
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
}
// fresh online address (admin/executor)
HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
if (list != null) {
//....
}
// fresh group address
// ....
}
任务执行异常补偿机制
monitorThread
是定时任务,执行时间间隔为10s,主要工作如下:
- 查询出1000内执行失败的任务(告警状态为默认),循环处理这些失败的任务
- 将任务的默认状态更新为锁定状态(更新失败则跳过)
- 判断该任务是否配置了重试次数,如果配置的话,则进行重试
- 判断该任务是否配置了报警邮件,如果配置的话,则进行告警
- 更新任务状态为1,2,3中一个
告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
if (failLogIds!=null && !failLogIds.isEmpty()) {
for (long failLogId: failLogIds) {
// lock log
int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
if (lockRet < 1) {
continue;
}
XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());
// 1、fail retry monitor
if (log.getExecutorFailRetryCount() > 0) {
JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
}
// 2、fail alarm monitor
int newAlarmStatus = 0; // 告警状态:0-默认、-1=锁定状态、1-无需告警、2-告警成功、3-告警失败
if (info!=null && info.getAlarmEmail()!=null && info.getAlarmEmail().trim().length()>0) {
boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
newAlarmStatus = alarmResult?2:3;
} else {
newAlarmStatus = 1;
}
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
}
}
客户端异常下线处理、回调处理
callbackThreadPool
线程池用来处理接收来自客户端的回调处理。拒绝策略也是r.run()
。
monitorThread
,是定时任务,时间间隔为60s,主要工作如下:
- 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败
// 任务结果丢失处理:调度记录停留在 "运行中" 状态超过10min,且对应执行器心跳注册失败不在线,则将本地调度主动标记失败;
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
if (losedJobIds!=null && losedJobIds.size()>0) {
for (Long logId: losedJobIds) {
XxlJobLog jobLog = new XxlJobLog();
jobLog.setId(logId);
jobLog.setHandleTime(new Date());
jobLog.setHandleCode(ReturnT.FAIL_CODE);
jobLog.setHandleMsg( I18nUtil.getString("joblog_lost_fail") );
XxlJobCompleter.updateHandleInfoAndFinish(jobLog);
}
}
任务执行统计报表
统计报表的核心代码如下:
Date todayTo = itemDay.getTime();
// refresh log-report every minute
XxlJobLogReport xxlJobLogReport = new XxlJobLogReport();
xxlJobLogReport.setTriggerDay(todayFrom);
xxlJobLogReport.setRunningCount(0);
xxlJobLogReport.setSucCount(0);
xxlJobLogReport.setFailCount(0);
Map<String, Object> triggerCountMap = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLogReport(todayFrom, todayTo);
if (triggerCountMap!=null && triggerCountMap.size()>0) {
int triggerDayCount = triggerCountMap.containsKey("triggerDayCount")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCount"))):0;
int triggerDayCountRunning = triggerCountMap.containsKey("triggerDayCountRunning")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountRunning"))):0;
int triggerDayCountSuc = triggerCountMap.containsKey("triggerDayCountSuc")?Integer.valueOf(String.valueOf(triggerCountMap.get("triggerDayCountSuc"))):0;
int triggerDayCountFail = triggerDayCount - triggerDayCountRunning - triggerDayCountSuc;
xxlJobLogReport.setRunningCount(triggerDayCountRunning);
xxlJobLogReport.setSucCount(triggerDayCountSuc);
xxlJobLogReport.setFailCount(triggerDayCountFail);
}
// do refresh
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().update(xxlJobLogReport);
if (ret < 1) {
XxlJobAdminConfig.getAdminConfig().getXxlJobLogReportDao().save(xxlJobLogReport);
}
任务执行定时任务(生成任务下一次执行时间)
scheduleThread
是个定时任务,时间间隔是动态计算的,这是服务端最核心的部分功能。
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 ); // 秒数为0, 5, 10.....
} catch (InterruptedException e) {
//....
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
// 默认为(200 + 100) * 20 = 60000
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
// 获取数据库资源
boolean preReadSuc = true;
try {
// ...
// 争取到锁权限(争取不到的节点,将会在这里阻塞,比如当部署多个节点时)
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、pre read
long nowTime = System.currentTimeMillis();
// 查询下一次执行时间小于当前时间的5s后,并查默认6000条
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump
// 当前时间 > 下一次触发时间 + 5s
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match
// 过期调度策略
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2、fresh next
// 刷新下次执行的时间
// 如果计算不到下次执行时间,将会停止任务
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 当前时间稍大于下一次执行时间(5s内)
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time
// 1、trigger
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next
refreshNextValidTime(jobInfo, new Date());
// 如果下次触发的时间间隔在5s内,则写入ring线程,由时间轮线程处理
// next-trigger-time in 5s, pre-read again
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 计算下一次触发事件将会掉落在时间轮的哪一格上
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);//
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 下次触发的时间在当前时间之后,计算本次触发的时间将会掉落在时间轮的哪一个格上
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false; // 没有可执行任务
}
// tx stop
} catch (Exception e) {
// ....
} finally {
// 关闭资源
}
long cost = System.currentTimeMillis()-start;
// Wait seconds, align second
if (cost < 1000) { // scan-overtime, not wait
try {
// pre-read period: success > scan each second; fail > skip this period;
// 5s内没有可执行的任务,则沉睡到下一个零整5s,比如0,5,10....
// 如果有则沉睡到下一个零整1s, 1,2,3,4....
TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
}
ring thread
是一个时间轮设计,最高为60格,内部是一个map,key为秒数,value为待执行任务ID列表。
public void run() {
while (!ringThreadToStop) {
// align second
try {
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000); // 对齐到下一秒
} catch (InterruptedException e) {
if (!ringThreadToStop) {
logger.error(e.getMessage(), e);
}
}
try {
// second data
List<Integer> ringItemData = new ArrayList<>();
int nowSecond = Calendar.getInstance().get(Calendar.SECOND); // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 ); // 将当前刻度+上一个刻度。距离当前是4s, i=0则为4, i1则为3
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
// ring trigger
logger.debug(">>>>>>>>>>> xxl-job, time-ring beat : " + nowSecond + " = " + Arrays.asList(ringItemData) );
if (ringItemData.size() > 0) {
// do trigger
for (int jobId: ringItemData) {
// do trigger
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
}
// clear
ringItemData.clear();
}
} catch (Exception e) {
if (!ringThreadToStop) {
logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
}
}
}
logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
}
小结:
由此服务端的核心设计基本就算拆解完了,最核心的部分是任务执行定时任务
,内部实现了秒级的时间轮算法(有点粗糙....)。总的架构设计并不算简单,由于没有外部框架依赖,需要自己实现服务注册/服务注销/服务掉线检查等等的功能,所以总体上又可以将架构分为:服务治理与服务调度两个大块去看。服务治理主要作用于维系服务端与客户端两个角色的连接状态。服务调度则是xxl-job
的核心功能。
客户端设计
客户端的肯定不如服务端复杂,基本上就几个点:
- 扫描
Spring
容器中,加了XXL
注解的方法,生成MethodJobHandler
加入到注册表中 - 初始话客户端日志相关内容
- 初始话客户端服务器(用户接收来自服务端调用)
- 服务注册与注销
- 处理执行成功的回调任务
其中重点关注的点在于客户端服务器,以及服务注册与注销
客户端服务器
EmbedServer
是一个基于Netty
构建的Http服务器
,默认端口号为9000
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS)) // beat 3N, close if idle
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(5 * 1024 * 1024)) // merge request & reponse to FULL
.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));// http 处理器
}
})
.childOption(ChannelOption.SO_KEEPALIVE, true);
// bind
ChannelFuture future = bootstrap.bind(port).sync();
logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);
// start registry
startRegistry(appname, address); // 服务注册
// wait util stop
future.channel().closeFuture().sync();
EmbedHttpServerHandler
处理来自服务端的Http
请求,主要包括心跳
,空闲线程检查
,执行
,停止任务执行
,获取日志
.
private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {
// valid 仅支持post
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
// 检验token
if (accessToken!=null
&& accessToken.trim().length()>0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
// services mapping
try {
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);// 检查线程是否在运行,队列中是否有数据
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {// 停止客户端线程
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
} catch (Exception e) {
// ....
}
}
重点关注run
事件,它主要是找到相应的IHandler
,构建JobThread
,推到线程队列中等线程处理。
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// valid:jobHandler + jobThread
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// new jobhandler
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// valid old jobThread
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
// valid handler
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
//...
} else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
// ....
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
}
// executor block strategy
if (jobThread != null) { // 根据配置的阻塞策略来执行
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { // 丢弃后续调度
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { // 覆盖之前调度
// kill running jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else { // 单机串行
// just queue trigger
}
}
// replace thread (new or exists invalid)
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); // 生成该handler的执行线程并注册到注册表中,启动该线程
}
// push data to queue
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam); // 放入阻塞队列,线程会对阻塞队列的数据进行处理
return pushResult;
}
job thread
的处理逻辑,具体如下:
// init
try {
handler.init(); // ihandler有init方法支持
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
// execute
while(!toStop){
running = false;
idleTimes++; // 空闲次数
TriggerParam triggerParam = null;
try {
// to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout)
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); // 阻塞等待3s
if (triggerParam!=null) {
running = true;
idleTimes = 0; // 空闲次数清零
triggerLogIdSet.remove(triggerParam.getLogId());
// log filename, like "logPath/yyyy-MM-dd/9999.log"
String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTime()), triggerParam.getLogId());
// 上下文环境
XxlJobContext xxlJobContext = new XxlJobContext(
triggerParam.getJobId(),
triggerParam.getExecutorParams(),
logFileName,
triggerParam.getBroadcastIndex(),
triggerParam.getBroadcastTotal());
// init job context
XxlJobContext.setXxlJobContext(xxlJobContext);
// execute
XxlJobHelper.log("<br>----------- xxl-job job execute start -----------<br>----------- Param:" + xxlJobContext.getJobParam());
// 设置了执行超时时间,则采用异步超时等待执行的方式
if (triggerParam.getExecutorTimeout() > 0) {
// limit timeout
Thread futureThread = null;
try {
FutureTask<Boolean> futureTask = new FutureTask<Boolean>(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
// init job context
XxlJobContext.setXxlJobContext(xxlJobContext);
handler.execute(); // 执行
return true;
}
});
futureThread = new Thread(futureTask);
futureThread.start();
Boolean tempResult = futureTask.get(triggerParam.getExecutorTimeout(), TimeUnit.SECONDS);// 超时时间的单位为秒
} catch (TimeoutException e) {
// ...
// handle result
XxlJobHelper.handleTimeout("job execute timeout "); // 处理超时
} finally {
futureThread.interrupt();
}
} else {
// just execute
handler.execute(); // 直接执行
}
// ....
} else {
if (idleTimes > 30) { // 当空闲次数超过30次
if(triggerQueue.size() == 0) { // avoid concurrent trigger causes jobId-lost 等待执行队列中没有数据
XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); // 移除该线程
}
}
}
} catch (Throwable e) {
// 处理异常
} finally {
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm 写入回调队列,等待回调线程上报处理结果
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.getXxlJobContext().getHandleCode(),
XxlJobContext.getXxlJobContext().getHandleMsg() )
);
} else {
// is killed 线程已暂停,可以kill,由服务端来kill
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_COCE_FAIL,
stopReason + " [job running, killed]" )
);
}
}
}
}
// 线程停止后,队列中有数据,写入回调队列,等待kill
// callback trigger request in queue
while(triggerQueue !=null && triggerQueue.size()>0){
TriggerParam triggerParam = triggerQueue.poll();
if (triggerParam!=null) {
// is killed
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.HANDLE_COCE_FAIL,
stopReason + " [job not executed, in the job queue, killed.]")
);
}
}
// destroy
try {
handler.destroy(); // 线程停止时销毁回调,ihandler有destroy支持
} catch (Throwable e) {
logger.error(e.getMessage(), e);
}
小结:至此,客户端的执行逻辑基本分析完成了。其中设计点主要为:每一个@XXL
的方法都将生成一个对应的线程来处理服务端的调度。并且当线程的空闲次数超过30次,每次3s,总共为90s,没有任务处理,将会关闭线程,下次调度时如果没有线程再重新生成,即同一个前后调度间隔超过90s的任务那不是要来回构建/删除线程吗?
服务注册与注销
EmbedServer
构建Http
服务器时会启动服务注册线程registryThread
,registryThread
每30s上报一次心跳。如果registryThread
的toStop
被更新为true,则进入服务注销流程。
public void run() {
// registry
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam); // 注册(上报心跳)
// ...
} catch (Exception e) {}
}
} catch (Exception e) {}
try {
if (!toStop) {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); // 30s
}
} catch (InterruptedException e) {}
}
// registry remove
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registryRemove(registryParam); // 注销
//....
} catch (Exception e) {}
}
} catch (Exception e) {}
}
小结:客户端的设计言简意赅,配置多个admin adress
就向多个服务端注册,这种方式也进一步说明服务端之间并未做状态同步(如果是同一个数据库即可以相互感知)。总而言之,客户端的设计主要是比较耗费线程资源,有些线程可能会出现不断构建、删除的情况。
网友评论