今天我们来看看springboot定时任务如何做成分布式组件来供项目集成依赖使用,接下来就跟着大熊猫一起做crontask分布式组件开发。
首先我们先创建一个crontask模块
其实这个定时任务组件最主要的操作就是定时任务记录,以及定时任务日志这两张表
接着就是一些工厂类的封装,监听类的实现
编写JpJob实现Job的excute方法,以及编写执行之前的方法,执行后的方法。
package com.panda.common.crontask.web.schedule;
import com.panda.common.crontask.common.DateUtil;
import com.panda.common.crontask.service.api.QuartzConfigService;
import com.panda.common.crontask.service.api.QuartzLogService;
import com.panda.common.crontask.service.api.dto.QuartzConfigDto;
import com.panda.common.crontask.service.api.dto.QuartzLogDto;
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import java.io.Serializable;
import java.time.Duration;
import java.time.LocalDateTime;
public abstract class JpJobimplements Job, Serializable {
private static Loggerlogger = LoggerFactory.getLogger(JpJob.class);
@Autowired
private QuartzLogServicequartzLogService;
@Autowired
private QuartzConfigServicequartzConfigService;
//任务开始时间
private ThreadLocalbeforTime =new ThreadLocal<>();
//日志Id
private ThreadLocallogId =new ThreadLocal<>();
public abstract void runJob();
public void beforeRun(){
String clazz =this.getClass().getName();
QuartzLogDto quartzLogDto =new QuartzLogDto();
quartzLogDto.setClazz(clazz);
QuartzConfigDto quartzConfigDto =quartzConfigService.findByClazz(clazz);
if(quartzConfigDto !=null){
quartzLogDto.setQuartzId(quartzConfigDto.getId());
quartzLogDto.setName(quartzConfigDto.getName());
}
quartzLogDto =quartzLogService.add(quartzLogDto);
beforTime.set(LocalDateTime.now());
logId.set(quartzLogDto.getId());
}
public void error(Exception e){
QuartzLogDto quartzLogDto =quartzLogService.get(logId.get());
Duration between = Duration.between(beforTime.get(), LocalDateTime.now());
quartzLogDto.setSpendTime(between.toMillis());
if(e !=null && e.getMessage() !=null)
quartzLogDto.setExceptionMessage(e.getMessage().length() >500 ? e.getMessage().substring(0, 499) : e.getMessage());
quartzLogDto.setResult(0);
quartzLogService.update(logId.get(), quartzLogDto);
}
public void afterRun(){
QuartzLogDto quartzLogDto =quartzLogService.get(logId.get());
Duration between = Duration.between(beforTime.get(), LocalDateTime.now());
quartzLogDto.setSpendTime(between.toMillis());
quartzLogDto.setResult(1);
quartzLogService.update(logId.get(), quartzLogDto);
}
@Override
public void execute(JobExecutionContext jobExecutionContext) {
String clazz =this.getClass().getName();
logger.info("==== 定时任务 "+clazz+" ====> 开启 " + DateUtil.getStringToday());
beforeRun();
try {
runJob();
}catch (Exception e) {
logger.info("==== 定时任务 "+clazz+" ====> 异常 "+e.getMessage());
error(e);
}finally {
logger.info("==== 定时任务 "+clazz+" ====> 结束 " + DateUtil.getStringToday());
afterRun();
}
}
}
编写JpJobFactory继承从而创建单例Job进行注入。
package com.panda.common.crontask.web.schedule;
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;
@Component
public class JpJobFactoryextends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactorycapableBeanFactory;
@Override
protected ObjectcreateJobInstance(TriggerFiredBundle bundle)throws Exception {
// 调用父类的方法
Object jobInstance =super.createJobInstance(bundle);
// 进行注入
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
调度工厂类实现任务配置读取服务,项目启动激活所有定时任务,任务暂停,恢复,以及执行一次任务的方法。
package com.panda.common.crontask.web.schedule;
import com.panda.base.service.api.exception.ServiceException;
import com.panda.common.crontask.service.api.QuartzConfigService;
import com.panda.common.crontask.service.api.dto.QuartzConfigDto;
import org.quartz.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* 调度工厂类
*
*/
@Service
@Component
public class JpSchedulerFactory {
private static Loggerlogger = LoggerFactory.getLogger(JpSchedulerFactory.class);
@Autowired
SchedulerFactoryBeanschedulerFactoryBean;
// 任务配置读取服务
@Autowired
private QuartzConfigServicequartzConfigService;
public void scheduleJobs() {
Scheduler scheduler = getScheduler();
startJob(scheduler);
}
// 获取scheduler
private SchedulergetScheduler(){
return schedulerFactoryBean.getScheduler();
}
// 项目启动,开启所有激活的任务
private void startJob(Scheduler scheduler) {
try {
// 获取所有激活的任务
List jobList =quartzConfigService.findByStatus(1);
for (QuartzConfigDto config : jobList) {
Class clazz = (Class) Class.forName(config.getClazz());
JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(config.getId()).build();
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(config.getCron());
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(config.getId())
.withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, cronTrigger);
}
}catch (ClassNotFoundException e) {
e.printStackTrace();
}catch (SchedulerException e) {
e.printStackTrace();
}
}
// 任务暂停
public void pauseJob(String id)throws SchedulerException {
QuartzConfigDto quartzConfigDto =quartzConfigService.get(id);
if(quartzConfigDto ==null)throw new ServiceException(502,"不存在的任务");
JobKey jobKey = JobKey.jobKey(id);
Scheduler scheduler = getScheduler();
scheduler.deleteJob(jobKey);
}
// 任务恢复
public void resumeJob(String id)throws SchedulerException, ClassNotFoundException {
QuartzConfigDto quartzConfigDto =quartzConfigService.get(id);
if(quartzConfigDto ==null)throw new ServiceException(502,"不存在的任务");
JobKey jobKey = JobKey.jobKey(id);
Scheduler scheduler = getScheduler();
Class clazz = (Class) Class.forName(quartzConfigDto.getClazz());
JobDetail detail = scheduler.getJobDetail(jobKey);
if (detail ==null){
JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(id).build();
CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.cronSchedule(quartzConfigDto.getCron());
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(id).withSchedule(scheduleBuilder).build();
scheduler.scheduleJob(jobDetail, cronTrigger);
}else {
scheduler.resumeJob(jobKey);
}
}
// 执行一次任务
public void runJob(String id)throws SchedulerException {
QuartzConfigDto quartzConfigDto =quartzConfigService.get(id);
if(quartzConfigDto ==null)throw new ServiceException(502,"不存在的任务");
JobKey jobKey = JobKey.jobKey(id);
Scheduler scheduler = getScheduler();
scheduler.triggerJob(jobKey);
}
}
定时任务运行工厂类用来springboot启动监听和注入SchedulerFactoryBean
package com.panda.common.crontask.web.schedule;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
/**
* 定时任务运行工厂类
*
*/
@Configuration
public class StartSchedulerListenerimplements ApplicationListener {
@Autowired
public com.panda.common.crontask.web.schedule.JpSchedulerFactoryjpSchedulerFactory;
@Autowired
private com.panda.common.crontask.web.schedule.JpJobFactoryjpJobFactory;
// springboot 启动监听
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
jpSchedulerFactory.scheduleJobs();
}
//注入SchedulerFactoryBean
@Bean
public SchedulerFactoryBeanschedulerFactoryBean() {
SchedulerFactoryBean schedulerFactoryBean =new SchedulerFactoryBean();
schedulerFactoryBean.setJobFactory(jpJobFactory);
return schedulerFactoryBean;
}
}
下面贴出api层代码
package com.panda.common.crontask.web.api;
import com.panda.auth.client.Authorization;
import com.panda.base.service.api.exception.ServiceException;
import com.panda.base.web.api.jersey.IResponse;
import com.panda.base.web.api.jersey.security.ISecurityContext;
import com.panda.base.web.api.response.WebApiResponse;
import com.panda.common.crontask.common.ApplicationConstants;
import com.panda.common.crontask.service.api.QuartzConfigService;
import com.panda.common.crontask.service.api.QuartzLogService;
import com.panda.common.crontask.service.api.dto.QuartzConfigDto;
import com.panda.common.crontask.service.api.dto.QuartzLogDto;
import com.panda.common.crontask.web.schedule.JpSchedulerFactory;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Component;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import javax.annotation.security.RolesAllowed;
import javax.ws.rs.*;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.SecurityContext;
import java.time.LocalDateTime;@Path("quartz")
@Component
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@Api(tags ="【公共组件-定时任务】")@RolesAllowed("*")@Authorization(permission ="quartz:*")
public class QuartzApiimplements ISecurityContext, IResponse {
private final static Loggerlog = LoggerFactory.getLogger(QuartzApi.class);
@Autowired
QuartzConfigServicequartzConfigService;
@Autowired
QuartzLogServicequartzLogService;
//事务模版
@Autowired
private TransactionTemplatetransactionTemplate;
@Autowired
JpSchedulerFactoryJpSchedulerFactory;
@ApiOperation(value ="获取任务分页")
@GET
public WebApiResponse>getQuartzConfigPage(@Context SecurityContext context,// 如果@Authorization存在,则不可缺少次参数。
@ApiParam(value ="页码(从0开始),默认0")@QueryParam("pageIndex") Integer pageIndex,
@ApiParam(value ="页大小,默认10")@QueryParam("pageSize") Integer pageSize,
@ApiParam(value ="任务名称")@QueryParam("name") String name,
@ApiParam(value ="任务状态 1-正常,0-停止")@QueryParam("status") Integer status) {
if (pageIndex ==null) pageIndex =0;
if (pageSize ==null) pageSize =10;
Pageable pageable = PageRequest.of(pageIndex, pageSize,Sort.by(Sort.Direction.DESC, "updateTime"));
return response(quartzConfigService.findQuartzConfigPage(status,name,pageable));
}
@ApiOperation(value ="通过Id获取任务详情")
@GET
@Path("{id}")
public WebApiResponsegetQuartzConfigById(@Context SecurityContext context,@ApiParam(required =true, value ="quartzConfig Id")@PathParam("id") String id) {
return response(quartzConfigService.get(id));
}
@ApiOperation(value ="通过类名获取任务详情")
@GET
@Path("getByClazz/{clazz}")
public WebApiResponsegetQuartzConfigByClazz(@Context SecurityContext context,@ApiParam(required =true, value ="clazz")@PathParam("clazz") String clazz ) {
return response(quartzConfigService.findByClazz(clazz));
}
@ApiOperation(value ="添加任务")
@POST
@Path("add")
public WebApiResponseaddQuartzConfig(QuartzConfigDto quartzConfigDto, @Context SecurityContext context) {
return response(quartzConfigService.addOrUpdateQuartzConfig(quartzConfigDto));
}
@ApiOperation(value ="删除任务")
@POST
@Path("delete/{id}")
public WebApiResponsedeleteQuartzConfig(@Context SecurityContext context, @ApiParam(required =true, value ="quartzConfig Id")@PathParam("id") String id) {
return transactionTemplate.execute(new TransactionCallback() {
@Override
public WebApiResponsedoInTransaction(TransactionStatus transactionStatus) {
try {
JpSchedulerFactory.pauseJob(id);
quartzConfigService.delete(id);
}catch (Exception e) {
log.error("删除任务任务失败!", e);
transactionStatus.setRollbackOnly();
throw new ServiceException(502,"删除任务失败");
}
log.info("删除任务成功!");
return done();
}
});
}
@ApiOperation(value ="更新任务")
@POST
public WebApiResponseupdateQuartzConfig(QuartzConfigDto quartzConfigDto, @Context SecurityContext context) {
try {
JpSchedulerFactory.pauseJob(quartzConfigDto.getId());
quartzConfigDto =quartzConfigService.updateQuartzConfig(quartzConfigDto);
JpSchedulerFactory.resumeJob(quartzConfigDto.getId());
}catch (Exception e) {
log.error("更新任务失败!", e);
throw new ServiceException(502,"暂停任务失败");
}
log.info("更新任务成功!");
return response(quartzConfigDto);
}
@ApiOperation(value ="暂停任务")
@GET
@Path("pause/{id}")
public WebApiResponsepauseQuartzConfig(@Context SecurityContext context, @ApiParam(required =true, value ="quartzConfig Id")@PathParam("id") String id) {
return transactionTemplate.execute(new TransactionCallback() {
@Override
public WebApiResponsedoInTransaction(TransactionStatus transactionStatus) {
try {
JpSchedulerFactory.pauseJob(id);
quartzConfigService.updateJobStatus(id, ApplicationConstants.PAUSE_CODE);
}catch (Exception e) {
log.error("暂停任务失败!", e);
transactionStatus.setRollbackOnly();
throw new ServiceException(502,"暂停任务失败");
}
log.info("暂停任务成功!");
return done();
}
});
}
@ApiOperation(value ="恢复任务")
@GET
@Path("resume/{id}")
public WebApiResponseresumeQuartzConfig(@Context SecurityContext context, @ApiParam(required =true, value ="quartzConfig Id")@PathParam("id") String id) {
return transactionTemplate.execute(new TransactionCallback() {
@Override
public WebApiResponsedoInTransaction(TransactionStatus transactionStatus) {
try {
JpSchedulerFactory.resumeJob(id);
quartzConfigService.updateJobStatus(id,ApplicationConstants.ACTIVE_CODE);
}catch (Exception e) {
log.error("恢复任务失败!", e);
transactionStatus.setRollbackOnly();
throw new ServiceException(502,"恢复任务失败");
}
log.info("恢复任务成功!");
return done();
}
});
}
@ApiOperation(value ="执行一次任务")
@GET
@Path("run/{id}")
public WebApiResponserunQuartzConfig(@Context SecurityContext context, @ApiParam(required =true, value ="quartzConfig Id")@PathParam("id") String id) {
try {
JpSchedulerFactory.runJob(id);
}catch (Exception e) {
log.error("执行任务失败!", e);
throw new ServiceException(502,"执行任务失败");
}
log.info("执行任务成功!");
return done();
}
@ApiOperation(value ="获取任务日志分页")
@GET
@Path("log")
public WebApiResponse>getQuartzLogPage(@Context SecurityContext context,@ApiParam(required =false, value ="页码(从0开始),默认0")@QueryParam("pageIndex") Integer pageIndex,@ApiParam(required =false, value ="页大小,默认10")@QueryParam("pageSize") Integer pageSize,@ApiParam(value ="任务Id")@QueryParam("quartzId") String quartzId,@ApiParam(value ="任务名称")@QueryParam("name") String name,@ApiParam(value ="执行结果 1-成功,0-失败")@QueryParam("result") Integer result,@ApiParam(value ="开始时间 yyyy-MM-dd 00:00:00")@QueryParam("startTime") LocalDateTime startTime,
@ApiParam(value ="结束时间 yyyy-MM-dd 00:00:00")@QueryParam("endTime") LocalDateTime endTime) {
if (pageIndex ==null) pageIndex =0;
if (pageSize ==null) pageSize =10;
Pageable pageable = PageRequest.of(pageIndex, pageSize, Sort.by(Sort.Direction.DESC, "createTime"));
return response(quartzLogService.findQuartzLogPage(quartzId,name,result,startTime,endTime,pageable));
}
}
执行实例
package com.panda.common.crontask.web.task;
import com.panda.common.crontask.web.schedule.JpJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.stereotype.Component;
import java.io.Serializable;
/**
* 定时任务实现类
*/
@Component
@EnableScheduling
public class ScheduleTaskTestJobextends JpJobimplements Serializable {
private static Loggerlogger = LoggerFactory.getLogger(ScheduleTaskTestJob.class);
/**
* 执行任务
*
* @throws RuntimeException
*/
@Override
public void runJob()throws RuntimeException {
//在这里写定时任务执行的内容
logger.info("==== 定时任务 ScheduleTaskTestJob ====> 执行中...... ");
}
}
以上就是实现定时任务分布式组件的主要代码,写一个run模块集成一下数据库,做一个启动类就可以运行了,然后上传到neuxs做成依赖就可以供其他项目使用了,想看源码可以关注“蛋皮皮”公众号找大熊猫给你源码。
网友评论