Quartz的整体概括
什么是quartz
何为quartz,请看官网的说法:
Quartz is a richly featured, open source job scheduling library that can be integrated within virtually any Java application - from the smallest stand-alone application to the largest e-commerce system. Quartz can be used to create simple or complex schedules for executing tens, hundreds, or even tens-of-thousands of jobs; jobs whose tasks are defined as standard Java components that may execute virtually anything you may program them to do. The Quartz Scheduler includes many enterprise-class features, such as support for JTA transactions and clustering.
简单来说,quartz
是一个开源任务调度库,可以用来创建简单或复杂的调度,低至十个多至数百万个。它是一个标准的java
组件,支持JTA,集群等多种企业级功能。
市面上有很多定时任务框架在quartz的基础上做了二次开发,xxl-job
(基于quartz),elastic-job
(基于quartz和zk),所以quartz
到底是怎么玩的,它有哪些特性,下面来聊一聊。
quartz的基本概念
- 任务(Job):实际要触发的事件
- 触发器(Trigger):用于设定时间规则
- 调度器(Scheduler):组合任务与触发器
quartz
就这三样东西,我们新建作业,通过trigger
设置规则触发,由scheduler
进行整合,非常简单。
Springboot整合quartz的基础搭建
一般企业级项目开发都用的Springboot
,下面就来讲一讲quartz
整合Springboot
的一些要点。
依赖
quartz版本2.3.0,springboot版本1.5.18.RELEASE
<properties>
<java.version>1.8</java.version>
<druid.version>1.1.5</druid.version>
<quartz.version>2.3.0</quartz.version>
<fastjson.version>1.2.40</fastjson.version>
<mybatis.version>1.3.0</mybatis.version>
<log4j.version>1.2.16</log4j.version>
<slf4j-api.version>1.7.7</slf4j-api.version>
<slf4j-log4j12.version>1.7.7</slf4j-log4j12.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid-spring-boot-starter</artifactId>
<version>${druid.version}</version>
</dependency>
<!--quartz相关依赖-->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>${quartz.version}</version>
</dependency>
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz-jobs</artifactId>
<version>${quartz.version}</version>
</dependency>
<!--定时任务需要依赖context模块-->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.3.2</version>
</dependency>
<!-- log4j日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j-api.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j-log4j12.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Configuration
通过AutowireCapableBeanFactory
,使用spring
注入的方式实现在job
里注入spring
的bean
。
/**
* 继承org.springframework.scheduling.quartz.SpringBeanJobFactory
* 实现任务实例化方式
*/
public static class AutowiringSpringBeanJobFactory extends SpringBeanJobFactory implements
ApplicationContextAware {
private transient AutowireCapableBeanFactory beanFactory;
@Override
public void setApplicationContext(final ApplicationContext context) {
beanFactory = context.getAutowireCapableBeanFactory();
}
/**
* 将job实例交给spring ioc托管
* 我们在job实例实现类内可以直接使用spring注入的调用被spring ioc管理的实例
*
* @param bundle
* @return
* @throws Exception
*/
@Override
protected Object createJobInstance(final TriggerFiredBundle bundle) throws Exception {
final Object job = super.createJobInstance(bundle);
/**
* 将job实例交付给spring ioc
*/
beanFactory.autowireBean(job);
return job;
}
}
/**
* 配置任务工厂实例
*
* @param applicationContext spring上下文实例
* @return
*/
@Bean
public JobFactory jobFactory(ApplicationContext applicationContext) {
/**
* 采用自定义任务工厂 整合spring实例来完成构建任务
* see {@link AutowiringSpringBeanJobFactory}
*/
AutowiringSpringBeanJobFactory jobFactory = new AutowiringSpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}
/**
* 配置任务调度器
* 使用项目数据源作为quartz数据源
*
* @param jobFactory 自定义配置任务工厂
* @param dataSource 数据源实例
* @return
* @throws Exception
*/
@Bean(destroyMethod = "destroy", autowire = Autowire.NO)
public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory, DataSource dataSource) throws Exception {
SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
//将spring管理job自定义工厂交由调度器维护
schedulerFactoryBean.setJobFactory(jobFactory);
//设置覆盖已存在的任务
schedulerFactoryBean.setOverwriteExistingJobs(true);
//项目启动完成后,等待2秒后开始执行调度器初始化
schedulerFactoryBean.setStartupDelay(2);
//设置调度器自动运行
schedulerFactoryBean.setAutoStartup(true);
//设置数据源,使用与项目统一数据源
schedulerFactoryBean.setDataSource(dataSource);
//设置上下文spring bean name
schedulerFactoryBean.setApplicationContextSchedulerContextKey("applicationContext");
//设置配置文件位置
schedulerFactoryBean.setConfigLocation(new ClassPathResource("/quartz.properties"));
return schedulerFactoryBean;
}
这里需要提到一点,由于job
的初始化时是通过new出来的,不受spring
的管理,无法接受业务相关的bean
,故这里使用AutowireCapableBeanFactory
实现了new
出来的对象通过注解可注入受spring
管理的bean
了。
AbstractAutowireCapableBeanFactory#autowireBean
@Override
public void autowireBean(Object existingBean) {
// Use non-singleton bean definition, to avoid registering bean as dependent bean.
RootBeanDefinition bd = new RootBeanDefinition(ClassUtils.getUserClass(existingBean));
bd.setScope(BeanDefinition.SCOPE_PROTOTYPE);
bd.allowCaching = ClassUtils.isCacheSafe(bd.getBeanClass(), getBeanClassLoader());
BeanWrapper bw = new BeanWrapperImpl(existingBean);
initBeanWrapper(bw);
populateBean(bd.getBeanClass().getName(), bd, bw);
}
由源码可知,此类调用了populateBean的方法用来装配bean。具体spring的bean的加载注册过程可参考spring.io。
通过schedulerFactoryBean
的ConfigLocation
来读取quartz
的基本配置信息,注意quartz.properties
配置文件一定要放在classpath下。
#调度器实例名称
org.quartz.scheduler.instanceName = quartzScheduler
#调度器实例编号自动生成
org.quartz.scheduler.instanceId = AUTO
#持久化方式配置
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
#持久化方式配置数据驱动,MySQL数据库
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#quartz相关数据表前缀名
org.quartz.jobStore.tablePrefix = QRTZ_
#开启分布式部署
org.quartz.jobStore.isClustered = true
#配置是否使用
org.quartz.jobStore.useProperties = false
#分布式节点有效性检查时间间隔,单位:毫秒
org.quartz.jobStore.clusterCheckinInterval = 10000
#线程池实现类
org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
#执行最大并发线程数量
org.quartz.threadPool.threadCount = 10
#线程优先级
org.quartz.threadPool.threadPriority = 5
#配置为守护线程,设置后任务将不会执行
#org.quartz.threadPool.makeThreadsDaemons=true
#配置是否启动自动加载数据库内的定时任务,默认true
org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
我们看到org.quartz.jobStore.class
进行持久化配置设置成了JobStoreTX
属性,需要建立数据库表进行任务信息的持久化。其实官方还有一种RAMJobStore
用于存储内存中的调度信息,当进程终止时,所有调度信息都将丢失。本文使用JobStoreTX
(需要建立quartz的大概10张表,建表语句传在了github)。
Job&JobDetail
JobDetail
作为Job
的实例,一般由静态方法JobBuilder
创建,通过fluent
风格链式构建了Job的各项属性,
其中newJob
需要一个泛型上限为Job
的入参。
// 构建job信息
JobDetail job = JobBuilder.newJob(DynamicQuartzJob.class)
.withIdentity(jobKey) //jobName+jobGroup
.withDescription(quartzJobDetails.getDescription())
.usingJobData("jobData", quartzJobDetails.getJobData())
.build();
而Job接口只有一个简单的方法:
public interface Job {
void execute(JobExecutionContext context)
throws JobExecutionException;
}
当定时任务跑起来的时候,execute
里的代码将会被执行。
比如我们创建一个简单的定时任务:
public class QuartzTest extends QuartzJobBean
static Logger logger = LoggerFactory.getLogger(QuartzTest.class);
{
@Override
protected void executeInternal(JobExecutionContext jobExecutionContext) throws JobExecutionException {
logger.info("我是测试任务,我跑起来了,时间:{}",new Date());
}
注:QuartzJobBean是Job接口的实现类。
Trigger
JobDetail
由JobBuilder
类的静态方法构建,同样,Trigger
触发器由TriggerBuilder
的静态方法构建。
// 构建job的触发规则 cronExpression
Trigger trigger = TriggerBuilder.newTrigger()
.withIdentity(triggerKey)
.startNow()
.withSchedule(CronScheduleBuilder
.cronSchedule(quartzJobDetails.getCronExpression()))
.build();
Trigger
触发器用于触发任务作业,当trigger
触发器触发执行时,scheduler
调度程序中的其中一个线程将调用execute()
的一个线程。quartz
最常用的触发器分为SimpleTrigger
和CronTrigger
触发器两种。
SimpleTrigger
用于在给定时间执行一次作业,或给定时间每隔一段时间执行一次作业。这个功能Springboot
的@scheduled
注解也能实现。
如果是希望以日历时间表触发,则CronTrigger
就比较合适例如每周六下午3点执行,我们完全可以用cron表达式实现日历触发的时间规则,cron表达式可由quartzJobDetails
对象的CronExpression
属性传入。
最后,别忘了用scheduler
将job
和trigger
整合起来,因为他们是统一协作的:
// 注册job和trigger信息
scheduler.scheduleJob(job, trigger);
JobDataMap
一般业务方法会要求动态传参处理,这时候就需要jobDataMap
来进行参数传递了。我们在构建JobDetail
的时候,通过
usingJobData("jobData", quartzJobDetails.getJobData())
动态传入调度任务所需的参数,以达到业务需求。
JobListener&TriggerListener
用于在任务调度期间,各阶段的状态解读。这里我就以JobListener
为例,TriggerListener
也是相似的。
首先,构建jobListener
jobContext.getScheduler().getListenerManager().addJobListener(new MyJobListener(), KeyMatcher.keyEquals(jobKey));
这里我是在executeInternal
方法里面构建的,因为listner不会持久化,服务重启将会丢失监听。当然在构建job的时候也可以注册listener,如果没持久化监听的需求的话。
看一下MyJobListener:
public class MyJobListener implements JobListener {
public static final String LISTENER_NAME = "MyJobListener";
// @Autowired
// private JobScheduleLogMapper logMapper;
@Override
public String getName() {
return LISTENER_NAME; //must return a name
}
//任务被调度前
@Override
public void jobToBeExecuted(JobExecutionContext context) {
String jobName = context.getJobDetail().getKey().toString();
// System.out.println("jobToBeExecuted");
System.out.println("Job调度前 : " + jobName + " is going to start...");
}
//任务调度被拒了
@Override
public void jobExecutionVetoed(JobExecutionContext context) {
System.out.println("Job调度被拒:jobExecutionVetoed");
//todo:原因捕获
}
//任务被调度后
@Override
public void jobWasExecuted(JobExecutionContext context,
JobExecutionException jobException) {
// System.out.println("Job调度器:jobWasExecuted");
String jobName = context.getJobDetail().getKey().toString();
System.out.println("Job调度后 : " + jobName + " is finished...");
if (jobException!=null&&!jobException.getMessage().equals("")) {
System.out.println("Exception thrown by: " + jobName
+ " Exception: " + jobException.getMessage());
}
JobScheduleLog log = new JobScheduleLog();
log.setJobRuntime(String.valueOf(context.getJobRunTime()));
log.setId(Optional.ofNullable(context.get("id")).map(p->Integer.parseInt(String.valueOf(context.get("id")))).orElse(null));
JobScheduleLogMapper logMapper = SpringContextHolder.getBean(JobScheduleLogMapper.class);
logMapper.updateByPrimaryKeySelective(log);
}
}
任务调度前,调度后已经任务被拒,我们都可以使用钩子。
动态构建任务调度
下一个问题,我们知道新建一个调度job只要继承QuartzJobBean
类并实现executeInternal
就行,那么如果我有成百上千个任务,难道我要新建几千个类么?如果我想把已有的方法加入定时任务调度,难道我还要去改造原有的方法么?
必然不是的,这时候我们可以新建一个动态类继承QuartzJobBean
,并新建自己的业务表(例如建一个jobCaller
表),传入项目方法的全类路径,这样我们就可以executeInternal方法里通过读表拉取需要调度的任务方法,通过jobDataMap
拿到参数,通过反射直接invoke
目标方法了,这样就省去了大量的构建调度任务的工作了,并且可以在不动原有业务代码的基础上,定向指定任何一个方法加入任务调度了。
ok,talk is cheap, show me the code:
public class DynamicQuartzJob extends QuartzJobBean {
@Autowired
private JobScheduleLogManager jobManager;
@Override
protected void executeInternal(JobExecutionContext jobContext) {
try {
int i = jobManager.trans2JobLogBefore(jobContext);
if (i <= 0) return;
JobDetailImpl jobDetail = (JobDetailImpl) jobContext.getJobDetail();
String name = jobDetail.getName();
if (StringUtils.isEmpty(name)) {
throw new JobExecutionException("can not find service info, because desription is empty");
}
//注册job和trigger的监听器
JobKey jobKey = jobContext.getJobDetail().getKey();
TriggerKey triggerKey = jobContext.getTrigger().getKey();
jobContext.getScheduler().getListenerManager().addJobListener(new MyJobListener(), KeyMatcher.keyEquals(jobKey));
jobContext.getScheduler().getListenerManager().addTriggerListener(new MyTriggerListener(), KeyMatcher.keyEquals(triggerKey));
String[] serviceInfo = StringUtils.delimitedListToStringArray(name, ".");
// serviceInfo[0] is JOB_NAME_PREFIX
String beanName = serviceInfo[1];
String methodName = serviceInfo[2];
Object serviceImpl = getApplicationContext(jobContext).getBean(beanName);
Method method;
Class<?>[] parameterTypes = new Class[]{String.class};
Object[] arguments = null;
method = serviceImpl.getClass().getMethod(methodName, parameterTypes);
method.invoke(serviceImpl, jobContext.getJobDetail().getJobDataMap().getString("jobData"));
jobManager.trans2JobLogAfter(jobContext, i);
} catch (Exception ex) {
ErrorLog.errorConvertJson(ApplicationContextWare.getAppName(), LogTreadLocal.getTrackingNo(), this.getClass(), "quartz定时任务execute异常", ex);
}
}
这里方法签名参数我设定了一个String类型的形参,其实可以在添加任务到jobCaller
表的时候带上参数,executeInternal
的时候读表拉取方法签名。当然也可以传一个大json,目标方法自己解析。
最佳实践
这里我新建了一张job_caller
表,用于记录我的jobName
(类名.方法名),jobGroup
(没有就默认),jobData
(jobDatamap),以及cron
表达式。
可以看到我们传入的时间规则是每隔10秒执行一次,调度的是HelloService
的sayHello()
方法,传入的参数是xgj111111.
看一下HelloService的sayHello()方法做了什么:
@Component
public class HelloService {
public void sayHello(String a) {
System.out.println(a+"======hello world, i am quartz");
}
public void callHello(String b) {
System.out.println(b+"======call");
}
}
ok,只是简单的打印,来看看效果:
image-20190203160412045.png每隔10秒(时间打印忘加了~),sayHello
都将会被执行,并且监听器能捕获到各个阶段。
单节点服务重启调度恢复
由于任务是持久化在表里的,在服务重启后,quartz
仍然可以去恢复调度任务,并且能够预先执行misfire
的任务,这里就不演示了,很简单的。
多节点分布式调度漂移
这个就比较有意思了,在多个节点调度确定的任务时,分布式环境下,某个节点宕机,这个节点调度的作业能否自动漂移到其他节点?
在quartz.properties
里,org.quartz.jobStore.isClustered
开启了分布式的配置,此属性设置为true,quartz
将使用ClusterManager
来初始化节点。
基于上一个调度HelloService#sayHello
,我们再新增一个调度用于调用HelloService#callHello
,同时新增一个quartz
节点。(为何第二个节点能调度callHello
?==>基于quartz
的负载均衡),如图:
启动两个服务,分别监听在8123
和8124
端口:
8123
调度的是callHello
8124
调度的是sayHello
这时候,我们把8123
服务停掉,看看8124
的调度情况:
停止8123
服务:
这时候可以发现,8124
将8123
的任务接管过来了:
于是可以得出结论:在分布式场景下,当quartz集群的某一台服务宕机,其所调度的任务将被其他服务接管,所以quartz
是支持任务漂移的。
那么如果这时候,我再讲8123
起来会是什么情况呢?聪明的我和你应该都想到了,它由继续接管callHello
的任务调度了。
quartz的缺陷
- 强依赖于各节点的系统时间,多节点系统时间不一致将会出现调度紊乱的情况
- 容易造成数据库死锁(一个任务只能由一个线程来调度,这是由
quartz_lock
表的行锁来实现的,可以通过设置数据库事务级别来解决,不过也有说设置了也出现deadlock的)
以上是我的一些基本见解和尝试。
代码已上传至GitHub:https://github.com/xugejunllt/quartz-framework
网友评论