手动执行任务
页面上点击“执行” 按钮, 前端会发送一个请求 /jobinfo/trigger post 请求
param: id = 任务ID
controller最终会调用service的方法进行处理
public ReturnT<String> triggerJob(int id) {
// 从数据库中查询该任务的具体信息
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
if (xxlJobInfo == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "任务不存在" );
}
String group = String.valueOf(xxlJobInfo.getJobGroup());
String name = String.valueOf(xxlJobInfo.getId());
try {
// 调用执行器类,触发该任务 ,
XxlJobDynamicScheduler.triggerJob(name, group);
return ReturnT.SUCCESS;
} catch (SchedulerException e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
// XxlJobDynamicScheduler 中的triggerJob方法
public static boolean triggerJob(String jobName, String jobGroup) throws SchedulerException {
// TriggerKey : name + group
JobKey jobKey = new JobKey(jobName, jobGroup);
boolean result = false;
if (checkExists(jobName, jobGroup)) {
// 调用quartz的Scheduler来触发任务
scheduler.triggerJob(jobKey);
result = true;
logger.info(">>>>>>>>>>> runJob success, jobKey:{}", jobKey);
} else {
logger.info(">>>>>>>>>>> runJob fail, jobKey:{}", jobKey);
}
return result;
}
暂停任务
页面上点击“暂停” 按钮, 前端会发送一个请求 /jobinfo/pause post 请求
param: id = 任务ID
controller最终会调用service的方法进行处理
//XxlJobServiceImpl
@Override
public ReturnT<String> pause(int id) {
// 从数据库中获取任务信息,主要是为了获取group和name, 这个是组成在quartz里面的定时器的key
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
String group = String.valueOf(xxlJobInfo.getJobGroup());
String name = String.valueOf(xxlJobInfo.getId());
try {
// 调用quartz操作类来暂停任务
boolean ret = XxlJobDynamicScheduler.pauseJob(name, group); //
return ret?ReturnT.SUCCESS:ReturnT.FAIL;
} catch (SchedulerException e) {
logger.error(e.getMessage(), e);
return ReturnT.FAIL;
}
}
// XxlJobDynamicScheduler 中的pauseJob方法
public static boolean pauseJob(String jobName, String jobGroup) throws SchedulerException {
// TriggerKey : name + group
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
boolean result = false;
if (checkExists(jobName, jobGroup)) {
// 暂停任务
scheduler.pauseTrigger(triggerKey);
result = true;
logger.info(">>>>>>>>>>> pauseJob success, triggerKey:{}", triggerKey);
} else {
logger.info(">>>>>>>>>>> pauseJob fail, triggerKey:{}", triggerKey);
}
return result;
}
恢复任务
页面上点击“恢复” 按钮, 前端会发送一个请求 /jobinfo/resume post 请求
param: id = 任务ID
controller最终会调用service的方法进行处理
//XxlJobServiceImpl
@Override
public ReturnT<String> resume(int id) {
// 从数据库中获取任务信息,主要是为了获取group和name, 这个是组成在quartz里面的定时器的key
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
String group = String.valueOf(xxlJobInfo.getJobGroup());
String name = String.valueOf(xxlJobInfo.getId());
try {
// 恢复任务
boolean ret = XxlJobDynamicScheduler.resumeJob(name, group);
return ret?ReturnT.SUCCESS:ReturnT.FAIL;
} catch (SchedulerException e) {
logger.error(e.getMessage(), e);
return ReturnT.FAIL;
}
}
// XxlJobDynamicScheduler 中的resumeJob方法
public static boolean resumeJob(String jobName, String jobGroup) throws SchedulerException {
// TriggerKey : name + group
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
boolean result = false;
if (checkExists(jobName, jobGroup)) {
// 暂停任务
scheduler.resumeTrigger(triggerKey);
result = true;
logger.info(">>>>>>>>>>> resumeJob success, triggerKey:{}", triggerKey);
} else {
logger.info(">>>>>>>>>>> resumeJob fail, triggerKey:{}", triggerKey);
}
return result;
}
删除任务
页面上点击“执行” 按钮, 前端会发送一个请求 /jobinfo/remove post 请求
param: id = 任务ID
controller最终会调用service的方法进行处理
//XxlJobServiceImpl
public ReturnT<String> remove(int id) {
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id);
String group = String.valueOf(xxlJobInfo.getJobGroup());
String name = String.valueOf(xxlJobInfo.getId());
try {
//调用quartz删除他内置的定时器
XxlJobDynamicScheduler.removeJob(name, group);
// 删除数据库中的任务
xxlJobInfoDao.delete(id);
// 删除调度日志
xxlJobLogDao.delete(id);
// 如果是脚本类型的任务,则删除脚本变化日志
xxlJobLogGlueDao.deleteByJobId(id);
return ReturnT.SUCCESS;
} catch (SchedulerException e) {
logger.error(e.getMessage(), e);
}
return ReturnT.FAIL;
}
终止任务
在调度日志的日志列表页面,正在执行者中的任务,可以手动进行终止。 前端会发送 /joblog/logKill? id= 日志ID
//JobLogController
@RequestMapping("/logKill")
@ResponseBody
public ReturnT<String> logKill(int id){
// 从数据库中获取该日志信息
XxlJobLog log = xxlJobLogDao.load(id);
// 获取该任务的信息
XxlJobInfo jobInfo = xxlJobInfoDao.loadById(log.getJobId());
if (jobInfo==null) {
return new ReturnT<String>(500, "任务不存在");
}
if (ReturnT.SUCCESS_CODE != log.getTriggerCode()) {
return new ReturnT<String>(500, "任务没有触发成功,无需终止");
}
// request of kill
ReturnT<String> runResult = null;
try {
// 通过NetComClientProxy创建代理对象,代理对象invoke方法里面包含了HTTP请求,会将该请求发送至执行器那一端。
// 通过执行器来终止该任务 , 下面主要来看一下执行器那边的kill方法。
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(log.getExecutorAddress());
runResult = executorBiz.kill(jobInfo.getId());
} catch (Exception e) {
logger.error(e.getMessage(), e);
runResult = new ReturnT<String>(500, e.getMessage());
}
if (ReturnT.SUCCESS_CODE == runResult.getCode()) {
log.setHandleCode(ReturnT.FAIL_CODE);
log.setHandleMsg( I18nUtil.getString("joblog_kill_log_byman")+":" + (runResult.getMsg()!=null?runResult.getMsg():""));
log.setHandleTime(new Date());
xxlJobLogDao.updateHandleInfo(log);
return new ReturnT<String>(runResult.getMsg());
} else {
return new ReturnT<String>(500, runResult.getMsg());
}
}
//ExecutorBizImpl
@Override
public ReturnT<String> kill(int jobId) {
// 从线程池里面根据该任务ID,获取对应的线程
JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
if (jobThread != null) {
// 线程存在,则手动移除 ,下面可以看一下remove方法
XxlJobExecutor.removeJobThread(jobId, "人工手动终止");
return ReturnT.SUCCESS;
}
return new ReturnT<String>(ReturnT.SUCCESS_CODE, "job thread aleady killed.");
}
public static void removeJobThread(int jobId, String removeOldReason){
// 从线程池(ConcurrentHashMap)中移除该队列
JobThread oldJobThread = JobThreadRepository.remove(jobId);
if (oldJobThread != null) {
//发送stop信息,线程的run方法,发现该信息,则会停止运行,并记录日志
oldJobThread.toStop(removeOldReason);
// 发送中断信息
oldJobThread.interrupt();
}
}
网友评论