常规使用quartz需要每一个任务都实现org.quartz.Job接口,每次添加,删除或者修改任务配置都需要停机修改代码再重新部署,即使quartz提供API可以运行时删除或修改,但是机器重启后,只要代码不修改,还是会恢复成初始的配置。
所以针对以上的问题,最好是能做成动态可配置的才能更方便地管理我们的定时任务。最好的结果是可以直接使用spring管理的任意Bean对象,通过Bean的名称,指定的方法名,就能按照一定的调度策略执行定时任务。
github地址:https://github.com/zw201913/quartz-cluster,里面有详细使用方式。
1.先把quartz需要的数据表创建好
-- ----------------------------
-- Table structure for QRTZ_JOB_DETAILS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_JOB_DETAILS`;
CREATE TABLE `QRTZ_JOB_DETAILS` (
`SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
`JOB_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`JOB_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`DESCRIPTION` varchar(250) COLLATE utf8mb4_bin DEFAULT NULL,
`JOB_CLASS_NAME` varchar(250) COLLATE utf8mb4_bin NOT NULL,
`IS_DURABLE` varchar(1) COLLATE utf8mb4_bin NOT NULL,
`IS_NONCONCURRENT` varchar(1) COLLATE utf8mb4_bin NOT NULL,
`IS_UPDATE_DATA` varchar(1) COLLATE utf8mb4_bin NOT NULL,
`REQUESTS_RECOVERY` varchar(1) COLLATE utf8mb4_bin NOT NULL,
`JOB_DATA` blob,
PRIMARY KEY (`SCHED_NAME`,`JOB_NAME`,`JOB_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Table structure for QRTZ_TRIGGERS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_TRIGGERS`;
CREATE TABLE `QRTZ_TRIGGERS` (
`SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`JOB_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`JOB_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`DESCRIPTION` varchar(250) COLLATE utf8mb4_bin DEFAULT NULL,
`NEXT_FIRE_TIME` bigint(13) DEFAULT NULL,
`PREV_FIRE_TIME` bigint(13) DEFAULT NULL,
`PRIORITY` int(11) DEFAULT NULL,
`TRIGGER_STATE` varchar(16) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_TYPE` varchar(8) COLLATE utf8mb4_bin NOT NULL,
`START_TIME` bigint(13) NOT NULL,
`END_TIME` bigint(13) DEFAULT NULL,
`CALENDAR_NAME` varchar(200) COLLATE utf8mb4_bin DEFAULT NULL,
`MISFIRE_INSTR` smallint(2) DEFAULT NULL,
`JOB_DATA` blob,
PRIMARY KEY (`SCHED_NAME`,`TRIGGER_NAME`,`TRIGGER_GROUP`),
KEY `SCHED_NAME` (`SCHED_NAME`,`JOB_NAME`,`JOB_GROUP`),
CONSTRAINT `QRTZ_TRIGGERS_ibfk_1` FOREIGN KEY (`SCHED_NAME`, `JOB_NAME`, `JOB_GROUP`) REFERENCES `QRTZ_JOB_DETAILS` (`SCHED_NAME`, `JOB_NAME`, `JOB_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
DROP TABLE IF EXISTS `QRTZ_BLOB_TRIGGERS`;
CREATE TABLE `QRTZ_BLOB_TRIGGERS` (
`SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`BLOB_DATA` blob,
PRIMARY KEY (`SCHED_NAME`,`TRIGGER_NAME`,`TRIGGER_GROUP`),
CONSTRAINT `QRTZ_BLOB_TRIGGERS_ibfk_1` FOREIGN KEY (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`) REFERENCES `QRTZ_TRIGGERS` (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Table structure for QRTZ_CALENDARS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_CALENDARS`;
CREATE TABLE `QRTZ_CALENDARS` (
`SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
`CALENDAR_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`CALENDAR` blob NOT NULL,
PRIMARY KEY (`SCHED_NAME`,`CALENDAR_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Table structure for QRTZ_CRON_TRIGGERS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_CRON_TRIGGERS`;
CREATE TABLE `QRTZ_CRON_TRIGGERS` (
`SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`CRON_EXPRESSION` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`TIME_ZONE_ID` varchar(80) COLLATE utf8mb4_bin DEFAULT NULL,
PRIMARY KEY (`SCHED_NAME`,`TRIGGER_NAME`,`TRIGGER_GROUP`),
CONSTRAINT `QRTZ_CRON_TRIGGERS_ibfk_1` FOREIGN KEY (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`) REFERENCES `QRTZ_TRIGGERS` (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Table structure for QRTZ_FIRED_TRIGGERS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_FIRED_TRIGGERS`;
CREATE TABLE `QRTZ_FIRED_TRIGGERS` (
`SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
`ENTRY_ID` varchar(95) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`INSTANCE_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`FIRED_TIME` bigint(13) NOT NULL,
`SCHED_TIME` bigint(13) NOT NULL,
`PRIORITY` int(11) NOT NULL,
`STATE` varchar(16) COLLATE utf8mb4_bin NOT NULL,
`JOB_NAME` varchar(200) COLLATE utf8mb4_bin DEFAULT NULL,
`JOB_GROUP` varchar(200) COLLATE utf8mb4_bin DEFAULT NULL,
`IS_NONCONCURRENT` varchar(1) COLLATE utf8mb4_bin DEFAULT NULL,
`REQUESTS_RECOVERY` varchar(1) COLLATE utf8mb4_bin DEFAULT NULL,
PRIMARY KEY (`SCHED_NAME`,`ENTRY_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Table structure for QRTZ_LOCKS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_LOCKS`;
CREATE TABLE `QRTZ_LOCKS` (
`SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
`LOCK_NAME` varchar(40) COLLATE utf8mb4_bin NOT NULL,
PRIMARY KEY (`SCHED_NAME`,`LOCK_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Table structure for QRTZ_PAUSED_TRIGGER_GRPS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_PAUSED_TRIGGER_GRPS`;
CREATE TABLE `QRTZ_PAUSED_TRIGGER_GRPS` (
`SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
PRIMARY KEY (`SCHED_NAME`,`TRIGGER_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Table structure for QRTZ_SCHEDULER_STATE
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_SCHEDULER_STATE`;
CREATE TABLE `QRTZ_SCHEDULER_STATE` (
`SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
`INSTANCE_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`LAST_CHECKIN_TIME` bigint(13) NOT NULL,
`CHECKIN_INTERVAL` bigint(13) NOT NULL,
PRIMARY KEY (`SCHED_NAME`,`INSTANCE_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Table structure for QRTZ_SIMPLE_TRIGGERS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_SIMPLE_TRIGGERS`;
CREATE TABLE `QRTZ_SIMPLE_TRIGGERS` (
`SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`REPEAT_COUNT` bigint(7) NOT NULL,
`REPEAT_INTERVAL` bigint(12) NOT NULL,
`TIMES_TRIGGERED` bigint(10) NOT NULL,
PRIMARY KEY (`SCHED_NAME`,`TRIGGER_NAME`,`TRIGGER_GROUP`),
CONSTRAINT `QRTZ_SIMPLE_TRIGGERS_ibfk_1` FOREIGN KEY (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`) REFERENCES `QRTZ_TRIGGERS` (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
-- ----------------------------
-- Table structure for QRTZ_SIMPROP_TRIGGERS
-- ----------------------------
DROP TABLE IF EXISTS `QRTZ_SIMPROP_TRIGGERS`;
CREATE TABLE `QRTZ_SIMPROP_TRIGGERS` (
`SCHED_NAME` varchar(120) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_NAME` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`TRIGGER_GROUP` varchar(200) COLLATE utf8mb4_bin NOT NULL,
`STR_PROP_1` varchar(512) COLLATE utf8mb4_bin DEFAULT NULL,
`STR_PROP_2` varchar(512) COLLATE utf8mb4_bin DEFAULT NULL,
`STR_PROP_3` varchar(512) COLLATE utf8mb4_bin DEFAULT NULL,
`INT_PROP_1` int(11) DEFAULT NULL,
`INT_PROP_2` int(11) DEFAULT NULL,
`LONG_PROP_1` bigint(20) DEFAULT NULL,
`LONG_PROP_2` bigint(20) DEFAULT NULL,
`DEC_PROP_1` decimal(13,4) DEFAULT NULL,
`DEC_PROP_2` decimal(13,4) DEFAULT NULL,
`BOOL_PROP_1` varchar(1) COLLATE utf8mb4_bin DEFAULT NULL,
`BOOL_PROP_2` varchar(1) COLLATE utf8mb4_bin DEFAULT NULL,
PRIMARY KEY (`SCHED_NAME`,`TRIGGER_NAME`,`TRIGGER_GROUP`),
CONSTRAINT `QRTZ_SIMPROP_TRIGGERS_ibfk_1` FOREIGN KEY (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`) REFERENCES `QRTZ_TRIGGERS` (`SCHED_NAME`, `TRIGGER_NAME`, `TRIGGER_GROUP`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
2.依赖
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation "org.springframework.boot:spring-boot-starter-quartz"
compileOnly 'org.projectlombok:lombok'
implementation 'mysql:mysql-connector-java'
annotationProcessor 'org.projectlombok:lombok's
compile 'org.apache.commons:commons-lang3:3.9'
compile 'com.google.guava:guava:27.1-jre'
3.因为使用的时候我打算通过一个自定义注解就搞定它,类似下面这样:
import com.github.quartzcluster.annotation.EnableQuartzCluster;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@EnableQuartzCluster
//@EnableQuartzCluster("classpath:文件名")
//@EnableQuartzCluster("配置文件绝对路径")
@SpringBootApplication
public class QuartzClusterApplication {
public static void main(String[] args) {
SpringApplication.run(QuartzClusterApplication.class, args);
}
}
所以我们需要开始自定义一个EnableQuartzCluster注解
import com.github.quartzcluster.config.DynamicSchedulingConfigurationSelector;
import org.springframework.context.annotation.Import;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import(DynamicSchedulingConfigurationSelector.class)
public @interface EnableQuartzCluster {
String CLASSPATH_PREFIX = "classpath:";
String DEFAULT_PROPERTIES = "quartz.properties";
String value() default CLASSPATH_PREFIX + DEFAULT_PROPERTIES;
}
这个自定义注解有两个作用:
1.注入一些quartz必要的Bean对象
2.允许使用者指定配置文件
import com.github.quartzcluster.annotation.EnableQuartzCluster;
import com.google.common.collect.Maps;
import org.springframework.context.annotation.ImportSelector;
import org.springframework.core.type.AnnotationMetadata;
import org.springframework.util.CollectionUtils;
import java.util.Map;
public class DynamicSchedulingConfigurationSelector implements ImportSelector {
public static final Map<String, Object> annotationAttrs = Maps.newConcurrentMap();
@Override
public String[] selectImports(AnnotationMetadata importingClassMetadata) {
/** 获取指定的配置文件 */
Map<String, Object> map =
importingClassMetadata.getAnnotationAttributes(EnableQuartzCluster.class.getName());
if (!CollectionUtils.isEmpty(map)) {
map.forEach((key, value) -> annotationAttrs.put(key, value));
}
return new String[] {
ApplicationContextUtil.class.getName(),
DynamicSchedulingConfiguration.class.getName(),
SchedulerConfiguration.class.getName()
};
}
public static Object get(String key) {
return annotationAttrs.get(key);
}
}
DynamicSchedulingConfigurationSelector类指定类ApplicationContextUtil,DynamicSchedulingConfiguration,SchedulerConfiguration纳入spring管理,annotationAttrs存储了指定的配置文件名称。
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
public class ApplicationContextUtil implements ApplicationContextAware {
private static ApplicationContext appContext;
/**
* 获取Bean
*
* @param name
* @return
*/
public static Object getBean(String name) {
return appContext.getBean(name);
}
/**
* 获取bean
*
* @param clazz
* @param <T>
* @return
*/
public static <T> T getBean(Class<T> clazz) {
return appContext.getBean(clazz);
}
/**
* @param className
* @return
*/
public static Class<?> getType(String className) {
return appContext.getType(className);
}
/**
* @param name
* @return
*/
public static String getProperty(String name) {
return appContext.getEnvironment().getProperty(name);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
appContext = applicationContext;
}
}
ApplicationContextUtil实现了ApplicationContextAware接口,获取ApplicationContext上下文后可以通过静态方法获取任意指定Bean对象。如果我们能通过ApplicationContextUtil获取任意指定的Bean对象的话,那就能实现我们最终的目标:通过任意指定的Bean对象,按照一定调度策略,通过反射执行指定的方法
import com.github.quartzcluster.annotation.EnableQuartzCluster;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import javax.sql.DataSource;
import java.util.Objects;
public class DynamicSchedulingConfiguration {
/** 获取spring中的数据源 */
@Autowired private DataSource dataSource;
@Bean(name = "schedulerFactoryBean", destroyMethod = "destroy")
public SchedulerFactoryBean schedulerFactoryBean() {
SchedulerFactoryBean factory = new SchedulerFactoryBean();
// 设置数据源
factory.setDataSource(dataSource);
// 设置配置文件
factory.setConfigLocation(propertiesResource());
// 启动是否自动执行
factory.setAutoStartup(true);
// 是否覆盖已有的job
factory.setOverwriteExistingJobs(true);
// 项目启动5秒后开始执行定时任务
factory.setStartupDelay(5);
return factory;
}
/**
* 获取配置文件
*
* @return
*/
private Resource propertiesResource() {
String value = (String) DynamicSchedulingConfigurationSelector.get("value");
if (Objects.isNull(value)) {
return new ClassPathResource(EnableQuartzCluster.DEFAULT_PROPERTIES);
}
if (value.indexOf(EnableQuartzCluster.CLASSPATH_PREFIX) == 0) {
return new ClassPathResource(value.replace(EnableQuartzCluster.CLASSPATH_PREFIX, ""));
}
return new FileSystemResource(value);
}
}
上面的类就是把SchedulerFactoryBean作为一个Bean注入进spring容器,是为了下一步创建Scheduler对象做准备。
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
public class SchedulerConfiguration {
@Bean(name = "scheduler", initMethod = "start", destroyMethod = "shutdown")
public Scheduler scheduler(@Autowired SchedulerFactoryBean schedulerFactoryBean) {
Scheduler scheduler = schedulerFactoryBean.getScheduler();
return scheduler;
}
}
以上代码我们就把所有事先需要的Bean都创建好了。还有一个配置文件:quartz.properties
#============================================================================
# Configure JobStore
# Using Spring datasource in SchedulerConfig.java
# Spring uses LocalDataSourceJobStore extension of JobStoreCMT
#============================================================================
org.quartz.jobStore.useProperties=false
org.quartz.jobStore.tablePrefix = QRTZ_
org.quartz.jobStore.isClustered = true
org.quartz.jobStore.clusterCheckinInterval = 5000
org.quartz.jobStore.misfireThreshold = 60000
org.quartz.jobStore.txIsolationLevelReadCommitted = true
org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate
#============================================================================
# Configure Main Scheduler Properties
# Needed to manage cluster instances
#============================================================================
org.quartz.scheduler.instanceName = ClusterQuartz
org.quartz.scheduler.instanceId= AUTO
org.quartz.scheduler.rmi.export = false
org.quartz.scheduler.rmi.proxy = false
org.quartz.scheduler.wrapJobExecutionInUserTransaction = false
#============================================================================
# Configure ThreadPool
# Can also be configured in spring configuration
#============================================================================
#org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool
#org.quartz.threadPool.threadCount = 5
#org.quartz.threadPool.threadPriority = 5
#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true
下面我们开始使用上面已经创建好的Bean:
import com.github.quartzcluster.support.CronJobDefinition;
import com.github.quartzcluster.support.Key;
import com.github.quartzcluster.support.SimpleJobDefinition;
public interface IScheduleService {
/** @param cronJobDefinition */
void schedule(CronJobDefinition cronJobDefinition);
/** @param simpleJobDefinition */
void schedule(SimpleJobDefinition simpleJobDefinition);
/**
* 暂停触发器
*
* @param key
*/
void pauseTrigger(Key key);
/**
* 恢复触发器
*
* @param key
*/
void resumeTrigger(Key key);
/**
* 删除触发器
*
* @param key
* @return
*/
boolean removeTrigger(Key key);
}
import com.github.quartzcluster.service.IScheduleService;
import com.github.quartzcluster.support.CronJobDefinition;
import com.github.quartzcluster.support.Key;
import com.github.quartzcluster.support.SimpleJobDefinition;
import org.quartz.Scheduler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class ScheduleServiceImpl implements IScheduleService {
@Autowired
private Scheduler scheduler;
@Override
public void schedule(CronJobDefinition cronJobDefinition) {
cronJobDefinition.scheduleJob(scheduler);
}
@Override
public void schedule(SimpleJobDefinition simpleJobDefinition) {
simpleJobDefinition.scheduleJob(scheduler);
}
/**
* 暂停触发器
* @param key
*/
@Override
public void pauseTrigger(Key key) {
key.pauseTrigger(scheduler);
}
/**
* 恢复触发器
* @param key
*/
@Override
public void resumeTrigger(Key key) {
key.resumeTrigger(scheduler);
}
/**
* 移除触发器
* @param key
* @return
*/
@Override
public boolean removeTrigger(Key key) {
return key.removeTrigger(scheduler);
}
}
import com.github.quartzcluster.service.IScheduleService;
import com.github.quartzcluster.support.CronJobDefinition;
import com.github.quartzcluster.support.Key;
import com.github.quartzcluster.support.SimpleJobDefinition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@RequestMapping("/job")
@RestController
public class JobScheduleController {
@Autowired private IScheduleService scheduleService;
/**
* 添加cron任务
*
* @param cronJobDefinition
* @return
*/
@PostMapping("/addCronJob")
public void add(@RequestPart("cronJobDefinition") CronJobDefinition cronJobDefinition) {
scheduleService.schedule(cronJobDefinition);
}
/**
* 添加简单任务
*
* @param simpleJobDefinition
*/
@PostMapping("/addSimpleJob")
public void add(@RequestPart("simpleJobDefinition") SimpleJobDefinition simpleJobDefinition) {
scheduleService.schedule(simpleJobDefinition);
}
/**
* 删除触发器
*
* @param key
* @return
*/
@DeleteMapping("/removeTrigger")
public boolean removeTrigger(@RequestPart("key") Key key) {
return scheduleService.removeTrigger(key);
}
/**
* 暂停触发器
*
* @param key
* @return
*/
@PutMapping("/pauseTrigger")
public void pauseTrigger(@RequestPart("key") Key key) {
scheduleService.pauseTrigger(key);
}
/**
* 恢复触发器
*
* @param key
*/
@PutMapping("/resumeTrigger")
public void resumeTrigger(@RequestPart("key") Key key) {
scheduleService.resumeTrigger(key);
}
}
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.quartz.CronScheduleBuilder;
import org.quartz.JobDetail;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
@Slf4j
@Data
public class CronJobDefinition extends JobDefinition {
private static final long serialVersionUID = 6940446397330926681L;
/** 执行策略 */
private String cronExpression;
@Override
protected Trigger trigger(JobDetail jobDetail) {
return TriggerBuilder.newTrigger()
.forJob(jobDetail)
.withIdentity(triggerKey())
.withSchedule(CronScheduleBuilder.cronSchedule(getCronExpression()))
.build();
}
}
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.quartz.JobDetail;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import java.util.Date;
@Slf4j
@Data
public class SimpleJobDefinition extends JobDefinition {
private static final long serialVersionUID = 6940446397330926681L;
/** 开始时间 */
private long startTime;
/** 结束时间 */
private long endTime;
/** 重复次数 */
private int repeatCount;
/** 时间间隔(单位:秒) */
private int repeatIntervalInSeconds;
@Override
protected Trigger trigger(JobDetail jobDetail) {
return TriggerBuilder.newTrigger()
.forJob(jobDetail)
.withIdentity(triggerKey())
.withSchedule(
SimpleScheduleBuilder.simpleSchedule()
.withRepeatCount(getRepeatCount())
.withIntervalInSeconds(getRepeatIntervalInSeconds()))
.startAt(new Date(getStartTime()))
.endAt(new Date(getEndTime()))
.build();
}
}
import com.github.quartzcluster.config.Const;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import java.io.Serializable;
@Slf4j
@Data
public abstract class JobDefinition extends Key implements Serializable {
private static final long serialVersionUID = -8355775738649736514L;
/** 任务ID(要求全局唯一,包括集群范围内) */
private String jobId;
/** 任务描述 */
private String description;
/** 假设时间间隔很短,上一次任务还没执行完毕,是否并行执行这次任务 */
private boolean isConcurrent;
/** 类名 */
private String className;
/** spring容器中的bean名称 */
private String springId;
/** 方法名称 */
private String methodName;
/** 方法参数 */
private String methodArg;
/** 创建时间 */
private long createTime;
/** 更新时间 */
private long updateTime;
/**
* 添加或修改
*
* @param scheduler
*/
public void scheduleJob(Scheduler scheduler) {
TriggerKey triggerKey = triggerKey();
JobDetail jobDetail = jobDetail();
Trigger trigger = trigger(jobDetail);
try {
scheduler.addJob(jobDetail, true, true);
if (scheduler.checkExists(triggerKey)) {
scheduler.rescheduleJob(triggerKey, trigger);
} else {
scheduler.scheduleJob(trigger);
}
} catch (SchedulerException e) {
throw new IllegalArgumentException(e);
}
}
/**
* 创建Trigger
*
* @param jobDetail
* @return
*/
protected abstract Trigger trigger(JobDetail jobDetail);
/**
* 创建jobDetail
*
* @return
*/
private JobDetail jobDetail() {
JobKey jobKey = new JobKey(getName(), getGroup());
JobDetail jobDetail =
JobBuilder.newJob(
isConcurrent()
? ConcurrentQuartzJob.class
: DisallowConcurrentQuartzJob.class)
.withIdentity(jobKey)
.withDescription(getDescription())
.build();
jobDetail.getJobDataMap().put(Const.JOB_DATA_KEY, this);
return jobDetail;
}
}
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerKey;
@Data
public class Key {
protected static final String DEFAULT_GROUP = "default_group";
/** 任务名称 */
private String name;
/** 任务分组 */
private String group;
/**
* 暂停触发器
*
* @param scheduler
*/
public void pauseTrigger(Scheduler scheduler) {
TriggerKey triggerKey = triggerKey();
try {
scheduler.pauseTrigger(triggerKey);
} catch (SchedulerException e) {
throw new IllegalArgumentException(e);
}
}
/**
* 恢复触发器
*
* @param scheduler
*/
public void resumeTrigger(Scheduler scheduler) {
TriggerKey triggerKey = triggerKey();
try {
scheduler.resumeTrigger(triggerKey);
} catch (SchedulerException e) {
throw new IllegalArgumentException(e);
}
}
/**
* 删除触发器
*
* @param scheduler
*/
public boolean removeTrigger(Scheduler scheduler) {
TriggerKey triggerKey = triggerKey();
try {
scheduler.pauseTrigger(triggerKey);
return scheduler.unscheduleJob(triggerKey);
} catch (SchedulerException e) {
throw new IllegalArgumentException(e);
}
}
protected TriggerKey triggerKey() {
return new TriggerKey(getName(), getGroup());
}
/**
* 获取group
*
* @return
*/
public String getGroup() {
return StringUtils.isBlank(group) ? DEFAULT_GROUP : group;
}
}
我们的使用方式已经定义完毕,但是我们还需要自定义两个Job:
import com.github.quartzcluster.config.Const;
import com.github.quartzcluster.core.JobActuator;
import org.quartz.JobExecutionContext;
import org.quartz.PersistJobDataAfterExecution;
import org.springframework.scheduling.quartz.QuartzJobBean;
@PersistJobDataAfterExecution
public class ConcurrentQuartzJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext context) {
JobDefinition task = (JobDefinition) context.getMergedJobDataMap().get(Const.JOB_DATA_KEY);
JobActuator.invoke(task);
}
}
import org.quartz.DisallowConcurrentExecution;
import org.quartz.PersistJobDataAfterExecution;
@PersistJobDataAfterExecution
@DisallowConcurrentExecution
public class DisallowConcurrentQuartzJob extends ConcurrentQuartzJob {}
真正执行反射的代码就是JobActuator类:
import com.github.quartzcluster.config.ApplicationContextUtil;
import com.github.quartzcluster.support.JobDefinition;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Objects;
@Slf4j
public class JobActuator {
public static void invoke(JobDefinition jobDefinition) {
String jobId = jobDefinition.getJobId();
String springId = jobDefinition.getSpringId();
String methodName = jobDefinition.getMethodName();
String methodArgs = jobDefinition.getMethodArg();
try {
invoke(springId, methodName, methodArgs);
} catch (NoSuchMethodException e) {
log.error("无效的methodName:" + methodName, e);
} catch (Exception e) {
log.error("执行定时任务失败", e);
log.error("id:{}, springId:{}, methodName:{}", jobId, springId, methodName);
}
}
/**
* 执行定时任务
*
* @param springId
* @param methodName
* @param methodArgs
* @throws InvocationTargetException
* @throws IllegalAccessException
* @throws NoSuchMethodException
*/
private static void invoke(String springId, String methodName, String methodArgs)
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
Object object = ApplicationContextUtil.getBean(springId);
if (Objects.isNull(object)) {
log.error("无效的springId:" + springId);
return;
}
Class<?> clazz = object.getClass();
Method method;
if (StringUtils.isBlank(methodArgs)) {
method = clazz.getDeclaredMethod(methodName);
} else {
method = clazz.getDeclaredMethod(methodName, new Class[] {String.class});
}
method.setAccessible(true);
if (StringUtils.isBlank(methodArgs)) {
method.invoke(object);
} else {
method.invoke(object, methodArgs);
}
}
}
以上代码就将quartz扩展成了完全动态化的定时任务组件。欢迎感兴趣的小伙伴参与讨论。
下面我们测试一下,先来一个目标类:
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
@Service
public class TestService {
public void test1() {
System.out.println(
"测试任务1" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()));
}
public void test2(String param) {
System.out.println(
param + ":" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()));
}
}
简单执行一下测试用例:
// Bean名称:testService
// 方法名:test1
// 调度策略:0/30 * * * * ?
curl -X "POST" "http://localhost:8080/job/addCronJob" \
-H 'Content-Type: multipart/form-data; charset=utf-8; boundary=__X_PAW_BOUNDARY__' \
-F "cronJobDefinition={\"cronExpression\":\"0/30 * * * * ? \",\"jobId\":\"1\",\"description\":\"测试定时任务1\",\"isConcurrent\":false,\"className\":\"com.github.quartzcluster.service.TestService\",\"springId\":\"testService\",\"methodName\":\"test1\",\"methodArg\":\"\",\"name\":\"name2\",\"group\":\"group1\",\"createTime\":1478422605000,\"updateTime\":1491305256000}"
// Bean名称:testService
// 方法名:test2
// 参数值:方法参数1
// 调度策略:0/10 * * * * ?
curl -X "POST" "http://localhost:8080/job/addCronJob" \
-H 'Content-Type: multipart/form-data; charset=utf-8; boundary=__X_PAW_BOUNDARY__' \
-F "cronJobDefinition={\"cronExpression\":\"0/10 * * * * ? \",\"jobId\":\"1\",\"description\":\"测试定时任务2\",\"isConcurrent\":false,\"className\":\"com.github.quartzcluster.service.TestService\",\"springId\":\"testService\",\"methodName\":\"test2\",\"methodArg\":\"方法参数1\",\"name\":\"name1\",\"group\":\"group1\",\"createTime\":1478422605000,\"updateTime\":1491305256000}"
// Bean名称:testService
// 方法名:test2
// 参数值:方法参数2
// 启始时间:2019-05-21 16:00:00
// 结束时间:2100-05-21 18:00:00
// 执行1000次,每次间隔10秒
curl -X "POST" "http://localhost:8080/job/addSimpleJob" \
-H 'Content-Type: multipart/form-data; charset=utf-8; boundary=__X_PAW_BOUNDARY__' \
-F "simpleJobDefinition={\"jobId\":\"1\",\"description\":\"测试定时任务\",\"isConcurrent\":false,\"className\":\"com.github.quartzcluster.service.TestService\",\"springId\":\"testService\",\"methodName\":\"test2\",\"methodArg\":\"方法参数2\":\"name1\",\"group\":\"group2\",\"createTime\":1478422605000,\"updateTime\":1491305256000,\"startTime\":1558425577130,\"endTime\":4114576800000,\"repeatCount\":1000,\"repeatIntervalInSeconds\":10}"
网友评论