【转载请注明出处】:土木匠 https://www.jianshu.com/p/7c6e63c88dc2
这篇文章我写的是集群方式的,如果是单节点且不需要持久化可以参考文章https://www.jianshu.com/p/fe257adc331d
1、依赖jar包
如果使用的是Spring cloud 微服务架构,查看官网发现目前中央仓库中还没有 spring-boot-starter-quartz
,只有在spring 的官方仓库中有
路径http://repo.spring.io/milestone/org/springframework/boot/spring-boot-starter-quartz/
要用的话需要手动指定版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
<version>2.0.0.M2</version>
</dependency>
细看这个jar包内容的话不难发现,这个jar包只是在pom文件中引用了几个依赖jar包
image.png
既然是这样,我们不如直接引用这几个jar包省事,还能根据自己的项目环境选择合适的版本
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
</dependency>
我的是Spring cloud项目,和spring相关的没有指定具体版本,各位视情况而定,如果是单纯的Spring boot项目,手动指定下版本号即可。
2、数据库配置
由于分布式的quartz需要将任务和触发器持久化到数据库,这样就的为quartz配置一套数据源,而一般的项目如果有其他的业务需要操作数据库,项目中本身需要为spring配置一套数据源,这样在同一个运行环境中就会同时有两套数据源的配置,如果不是有特殊需求,通一个环境有一套配置足以。在网上也看了好多集成的例子,但是都是配置两套,当时的直觉告诉我一套是可以的,当时也试了一些方式,后来自己摸索出来了,直接说我的实现方式,其他的省略,要看的话可以直接去官网看一眼,下面以msql数据库为例,其他类似。
image.png
quartz.properties
org.quartz.scheduler.instanceName=test-schedule
org.quartz.scheduler.instanceId=AUTO
org.quartz.jobStore.class=org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass=org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.tablePrefix=QRTZ_
org.quartz.jobStore.isClustered=true
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.clusterCheckinInterval=20000
org.quartz.scheduler.skipUpdateCheck=true
org.quartz.threadPool.class=org.quartz.simpl.SimpleThreadPool
org.quartz.threadPool.threadCount=10
org.quartz.threadPool.threadPriority=5
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread=true
application-dev.properties相关部分
server.port=8084
spring.application.name=test-schedule
server.tomcat.max-http-header-size=8192
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test-schedule?useUnicode=true&autoReconnect=true&rewriteBatchedStatements=TRUE&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
QuartzJobFactory
import org.quartz.spi.TriggerFiredBundle;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.config.AutowireCapableBeanFactory;
import org.springframework.scheduling.quartz.AdaptableJobFactory;
import org.springframework.stereotype.Component;
/**
* <p>Job工厂</p>
* <PRE>
* <BR> 修改记录
* <BR>-----------------------------------------------
* <BR> 修改日期 修改人 修改内容
* </PRE>
*
* @author zl
* @version 1.0
* @date Created in 2017/12/16 15:48
* @copyright: Copyright (c) founders
*/
@Component
public class QuartzJobFactory extends AdaptableJobFactory {
@Autowired
private AutowireCapableBeanFactory capableBeanFactory;
@Override
protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception {
Object jobInstance = super.createJobInstance(bundle);
capableBeanFactory.autowireBean(jobInstance);
return jobInstance;
}
}
QuartzConfig
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
/**
* <p>Quartz配置</p>
* <PRE>
* <BR> 修改记录
* <BR>-----------------------------------------------
* <BR> 修改日期 修改人 修改内容
* </PRE>
*
* @author zl
* @version 1.0
* @date Created in 2017/12/16 15:33
* @copyright: Copyright (c) founders
*/
@Configuration
public class QuartzConfig {
@Autowired
DataSource dataSource;
@Bean
public SchedulerFactoryBean schedulerFactoryBean (QuartzJobFactory quartzJobFactory) throws Exception {
SchedulerFactoryBean factoryBean=new SchedulerFactoryBean();
factoryBean.setJobFactory(quartzJobFactory);
factoryBean.setConfigLocation(new ClassPathResource("quartz.properties"));
factoryBean.setDataSource(dataSource);
factoryBean.afterPropertiesSet();
return factoryBean;
}
@Bean
public Scheduler scheduler(SchedulerFactoryBean schedulerFactoryBean) throws Exception {
Scheduler scheduler=schedulerFactoryBean.getScheduler();
scheduler.start();
return scheduler;
}
}
到这里,配置部分已经结束了,还需要创建一下数据库就可以安心写具体job了。
在quartz-2.3.0.jar
这个jar包的org.quartz.impl.jdbcjobstore包下有对应的各种数据库的初始化sql脚本
3、job
BaseJob
import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
/**
* <p>job接口</p>
* <PRE>
* <BR> 修改记录
* <BR>-----------------------------------------------
* <BR> 修改日期 修改人 修改内容
* </PRE>
*
* @author zl
* @version 1.0
* @date Created in 2017/12/16 15:57
* @copyright: Copyright (c) founders
*/
public interface BaseJob extends Job {
@Override
void execute(JobExecutionContext context) throws JobExecutionException;
}
TestJob
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.stereotype.Component;
/**
* <p>Test Job</p>
* <PRE>
* <BR> 修改记录
* <BR>-----------------------------------------------
* <BR> 修改日期 修改人 修改内容
* </PRE>
*
* @author zl
* @version 1.0
* @date Created in 2017/12/16 16:14
* @copyright: Copyright (c) founders
*/
@Slf4j
@Component
@DisallowConcurrentExecution
public class TestJob implements BaseJob {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
log.info("test job----PreviousFireTime={},NextFireTime={},FireTime={}" ,context.getPreviousFireTime(),context.getNextFireTime(),context.getFireTime());
}
}
@DisallowConcurrentExecution
意思是不允许并发执行,也就是说当Job的执行时间(如执行完需要30s)大于job的执行时间间隔(如10s),默认情况下,quartz为了能让job按照预定的时间间隔执行,会马上启用新的线程执行job。
这个时候启动项目,然后在数据库qrtz_cron_triggers
、qrtz_job_details
、qrtz_triggers
这三个表添加相应的任务和触发器,如果添加的正确,job是可以跑起来的,下面我再介绍下job的管理。
4、job管理
TaskInfoVo
import lombok.Data;
import java.util.Date;
/**
* <p>task</p>
* <PRE>
* <BR> 修改记录
* <BR>-----------------------------------------------
* <BR> 修改日期 修改人 修改内容
* </PRE>
*
* @author zl
* @version 1.0
* @date Created in 2017/12/16 22:00
* @copyright: Copyright (c) founders
*/
@Data
public class TaskInfoVo {
private String jobName;
private String jobGroup;
private String jobDescription;
private String jobStatus;
private String cronExpression;
private String createTime;
private Date previousFireTime;
private Date nextFireTime;
}
JobService 这个类拿去直接是可以用的,注意我用了lombok
import com.sunlands.zlcx.schedule.exception.BusinessException;
import com.sunlands.zlcx.schedule.vo.PageResultVO;
import com.sunlands.zlcx.schedule.vo.TaskInfoVo;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.quartz.*;
import org.quartz.impl.matchers.GroupMatcher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
/**
* <p>Job管理</p>
* <PRE>
* <BR> 修改记录
* <BR>-----------------------------------------------
* <BR> 修改日期 修改人 修改内容
* </PRE>
*
* @author zl
* @version 1.0
* @date Created in 2017/12/16 21:58
* @copyright: Copyright (c) founders
*/
@Service
@Slf4j
public class JobService {
@Autowired
private Scheduler scheduler;
/**
* 分页查询
*
* @return
*/
public PageResultVO<TaskInfoVo> list(int page, int size) {
PageResultVO<TaskInfoVo> resultVO = new PageResultVO<TaskInfoVo>();
try {
List<TaskInfoVo> list = new ArrayList<>();
for (String groupJob : scheduler.getJobGroupNames()) {
for (JobKey jobKey : scheduler.getJobKeys(GroupMatcher.<JobKey>groupEquals(groupJob))) {
List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (Trigger trigger : triggers) {
Trigger.TriggerState triggerState = scheduler.getTriggerState(trigger.getKey());
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
String cronExpression = "", createTime = "";
if (trigger instanceof CronTrigger) {
CronTrigger cronTrigger = (CronTrigger) trigger;
cronExpression = cronTrigger.getCronExpression();
createTime = cronTrigger.getDescription();
}
TaskInfoVo info = new TaskInfoVo();
info.setJobName(jobKey.getName());
info.setJobGroup(jobKey.getGroup());
info.setJobDescription(jobDetail.getDescription());
info.setJobStatus(triggerState.name());
info.setCronExpression(cronExpression);
info.setCreateTime(createTime);
info.setPreviousFireTime(trigger.getPreviousFireTime());
info.setNextFireTime(trigger.getNextFireTime());
list.add(info);
}
}
}
resultVO.setTotal(list.size());
resultVO.setRows(list.stream().skip((page - 1) * size).limit(size).collect(Collectors.toList()));
} catch (SchedulerException e) {
log.error("分页查询定时任务失败,page={},size={},e={}", page, size, e);
}
return resultVO;
}
/**
* 添加
*
* @param jobName
* @param jobGroup
* @param cronExpression
* @param jobDescription
*/
public void addJob(String jobName, String jobGroup, String cronExpression, String jobDescription) {
if (StringUtils.isAnyBlank(jobName, jobGroup, cronExpression, jobDescription)) {
throw new BusinessException(String.format("参数错误, jobName={},jobGroup={},cronExpression={},jobDescription={}", jobName, jobGroup, cronExpression, jobDescription));
}
String createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
try {
log.info("添加jobName={},jobGroup={},cronExpression={},jobDescription={}", jobName, jobGroup, cronExpression, jobDescription);
if (checkExists(jobName, jobGroup)) {
log.error("Job已经存在, jobName={},jobGroup={}", jobName, jobGroup);
throw new BusinessException(String.format("Job已经存在, jobName={},jobGroup={}", jobName, jobGroup));
}
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
JobKey jobKey = JobKey.jobKey(jobName, jobGroup);
CronScheduleBuilder schedBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing();
CronTrigger trigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withDescription(createTime).withSchedule(schedBuilder).build();
Class<? extends Job> clazz = (Class<? extends Job>) Class.forName(jobName);
JobDetail jobDetail = JobBuilder.newJob(clazz).withIdentity(jobKey).withDescription(jobDescription).build();
scheduler.scheduleJob(jobDetail, trigger);
} catch (SchedulerException | ClassNotFoundException e) {
log.error("添加job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e);
throw new BusinessException("类名不存在或执行表达式错误");
}
}
/**
* 修改
*
* @param jobName
* @param jobGroup
* @param cronExpression
* @param jobDescription
*/
public void edit(String jobName, String jobGroup, String cronExpression, String jobDescription) {
if (StringUtils.isAnyBlank(jobName, jobGroup, cronExpression, jobDescription)) {
throw new BusinessException(String.format("参数错误, jobName={},jobGroup={},cronExpression={},jobDescription={}", jobName, jobGroup, cronExpression, jobDescription));
}
String createTime = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
try {
log.info("修改jobName={},jobGroup={},cronExpression={},jobDescription={}", jobName, jobGroup, cronExpression, jobDescription);
if (!checkExists(jobName, jobGroup)) {
log.error("Job不存在, jobName={},jobGroup={}", jobName, jobGroup);
throw new BusinessException(String.format("Job不存在, jobName={},jobGroup={}", jobName, jobGroup));
}
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
JobKey jobKey = new JobKey(jobName, jobGroup);
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(cronExpression).withMisfireHandlingInstructionDoNothing();
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerKey).withDescription(createTime).withSchedule(cronScheduleBuilder).build();
JobDetail jobDetail = scheduler.getJobDetail(jobKey);
jobDetail.getJobBuilder().withDescription(jobDescription);
HashSet<Trigger> triggerSet = new HashSet<>();
triggerSet.add(cronTrigger);
scheduler.scheduleJob(jobDetail, triggerSet, true);
} catch (SchedulerException e) {
log.error("修改job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e);
throw new BusinessException("类名不存在或执行表达式错误");
}
}
/**
* 删除
*
* @param jobName
* @param jobGroup
*/
public void delete(String jobName, String jobGroup) {
try {
log.info("删除jobName={},jobGroup={}", jobName, jobGroup);
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
if (checkExists(jobName, jobGroup)) {
scheduler.pauseTrigger(triggerKey);
scheduler.unscheduleJob(triggerKey);
}
} catch (SchedulerException e) {
log.error("删除job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e);
throw new BusinessException(e.getMessage());
}
}
/**
* 暂停
*
* @param jobName
* @param jobGroup
*/
public void pause(String jobName, String jobGroup) {
try {
log.info("暂停jobName={},jobGroup={}", jobName, jobGroup);
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
if (!checkExists(jobName, jobGroup)) {
log.error("Job不存在, jobName={},jobGroup={}", jobName, jobGroup);
throw new BusinessException(String.format("Job不存在, jobName={},jobGroup={}", jobName, jobGroup));
}
scheduler.pauseTrigger(triggerKey);
} catch (SchedulerException e) {
log.error("暂停job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e);
throw new BusinessException(e.getMessage());
}
}
/**
* 重启
*
* @param jobName
* @param jobGroup
*/
public void resume(String jobName, String jobGroup) {
try {
log.info("重启jobName={},jobGroup={}", jobName, jobGroup);
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
if (!checkExists(jobName, jobGroup)) {
log.error("Job不存在, jobName={},jobGroup={}", jobName, jobGroup);
throw new BusinessException(String.format("Job不存在, jobName={},jobGroup={}", jobName, jobGroup));
}
scheduler.resumeTrigger(triggerKey);
} catch (SchedulerException e) {
log.error("重启job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e);
throw new BusinessException(e.getMessage());
}
}
/**
* 立即执行
*
* @param jobName
* @param jobGroup
*/
public void trigger(String jobName, String jobGroup) {
try {
log.info("立即执行jobName={},jobGroup={}", jobName, jobGroup);
if (!checkExists(jobName, jobGroup)) {
log.error("Job不存在, jobName={},jobGroup={}", jobName, jobGroup);
throw new BusinessException(String.format("Job不存在, jobName={},jobGroup={}", jobName, jobGroup));
}
JobKey jobKey = new JobKey(jobName, jobGroup);
scheduler.triggerJob(jobKey);
} catch (SchedulerException e) {
log.error("立即执行job失败, jobName={},jobGroup={},e={}", jobName, jobGroup, e);
throw new BusinessException(e.getMessage());
}
}
/**
* 验证是否存在
*
* @param jobName
* @param jobGroup
* @throws SchedulerException
*/
private boolean checkExists(String jobName, String jobGroup) throws SchedulerException {
TriggerKey triggerKey = TriggerKey.triggerKey(jobName, jobGroup);
return scheduler.checkExists(triggerKey);
}
}
Controller和页面我就不拿出来了,自行写一下即可,可以写个测试类跑一下。
【转载请注明出处】:土木匠 https://www.jianshu.com/p/7c6e63c88dc2
网友评论