背景
前两天研究的利用数据库锁实现Quartz分布式调度一文中提到几点问题,利用数据库行锁机制和唯一性约束,不仅无法解决单点问题,而且频繁访问数据库,造成db性能降低。那么最近就研究了一下redis缓存服务,通过redis的哨兵和复制功能(不知道这两个功能的,请自行百度)可以实现redis集群部署和redis分布式锁,并且数据是缓存在内存中的,所以性能要比数据库锁提高很多。
思路
1.既然是基于Redis的分布式锁,那么首先编写redis的相关配置,包括redisTemplate,cacheManager和redis分布式锁,我这里使用springboot集成redis缓存,网上有很多教程,有兴趣的可以自行研究一下。
2.配置好redis后,我使用TriggerListener接口的特性在执行任务调度时,处自动触发实现了TriggerListener接口的类,在vetoJobExecution方法总完成获取锁的操作,在triggerComplete方法中完成释放锁的操纵。
具体实现
redis相关配置
首先配置redsiTemplate,它用于直接操作redis缓存的值,过期时间等等
@Configuration
@EnableCaching
public class RedisConfig {
/**
* 定义 StringRedisTemplate ,指定序列化和反序列化的处理类
* @param factory
* @return
*/
@Bean
public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
StringRedisTemplate template = new StringRedisTemplate(factory);
Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(
Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
//序列化 值时使用此序列化方法
template.setValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
@Bean
public CacheManager cacheManager(RedisTemplate<String,String> redisTemplate) {
RedisCacheManager rcm = new RedisCacheManager(redisTemplate);
//使用前缀
rcm.setUsePrefix(true);
//缓存分割符 默认为 ":"
// rcm.setCachePrefix(new DefaultRedisCachePrefix(":"));
//设置缓存过期时间
//rcm.setDefaultExpiration(60);//秒
return rcm;
}
}
然后配置redis的分布式锁,由于redis的特性支持分布式锁,所以这里编写获取锁(lock)和释放锁(unlock)两个方法。
/**
* redis分布式锁
* @author hww
*/
@Component
public class RedisDistributedLock {
@Autowired
StringRedisTemplate redisTemplate;
/**
* redis锁默认超时时间
*/
private final long DEFAULT_TIMEOUT_LOCK = 60*1000;
public boolean lock(String key) throws Exception{
return lock(key, DEFAULT_TIMEOUT_LOCK);
}
/**
* 获取锁,并设置超时时间
* @param key
* @param timeout
* @return
* @throws Exception
*/
public boolean lock(final String key,long timeout) throws Exception{
if(StringUtils.isEmpty(key)){
throw new Exception("key must be not null!");
}
//设置key,并返回操作结果
Boolean execute = redisTemplate.execute(new RedisCallback<Boolean>() {
@Override
public Boolean doInRedis(RedisConnection connection)
throws DataAccessException {
//等同于redisTemplate.opsForValue().set(key, value),只不过此方法会返回一个操作结果
return connection.setNX(key.getBytes(), key.getBytes());
}
});
//如果为true,设置超时时间
if(execute){
System.out.println("获取到锁,设置超时时间");
Boolean expire = redisTemplate.expire(key, timeout, TimeUnit.SECONDS);
}
return execute;
}
/**
* 释放锁
* @param key
* @throws Exception
*/
public void unlock(String key) throws Exception{
if(StringUtils.isEmpty(key)){
throw new Exception("key must be not null!");
}
redisTemplate.delete(key);
System.out.println("释放锁");
}
}
编写TriggerListener实现类
TriggerListener的javaDoc介绍:
The interface to be implemented by classes that want to be informed when a
Trigger
fires. In general, applications that use aScheduler
will not have use for this mechanism
意思是说实现了该接口的实现类将会在绑定的trigger被触发时通知其执行相关动作。
public class RedisDistributedTriggerListener implements TriggerListener{
RedisDistributedLock redisDistributedLock = SpringUtils.getBean(RedisDistributedLock.class);
@Override
public String getName() {
return "RedisDistributedTriggerListener";
}
@Override
public void triggerFired(Trigger trigger, JobExecutionContext context) {
}
/**
* 当一个scheduler被调起时,将会唤起vetoJobExecution方法
* 返回true,则取消任务执行,否则继续执行
* @author hww
*/
public boolean vetoJobExecution(Trigger trigger, JobExecutionContext context) {
try {
String lockKey = trigger.getJobKey().getGroup() + ":" + trigger.getJobKey().getName();
//获取到锁,true代表获取到锁,返回false放行, false代表获取锁失败,返回true停止执行
return !redisDistributedLock.lock(lockKey);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
@Override
public void triggerMisfired(Trigger trigger) {
}
/**
* 在任务调度执行完成时,调用此方法
*/
@Override
public void triggerComplete(Trigger trigger, JobExecutionContext context,
CompletedExecutionInstruction triggerInstructionCode) {
String lockKey = trigger.getJobKey().getGroup() + ":" + trigger.getJobKey().getName();
try {
redisDistributedLock.unlock(lockKey);
} catch (Exception e) {
e.printStackTrace();
}
}
}
在这个类中有几点需要注意:
1.getName方法的返回值需要自己定义,默认为null,否则会抛出异常。
2.vetoJobExecution方法在任务调起时执行,当返回false时,继续执行任务,否则停止任务。
3.在vetoJobExecution方法中获取到锁,则继续执行任务,没有获取到则停止。
4.triggerComplete方法在在任务调度执行完成时,调用此方法来释放锁。
TriggerListener绑定Scheduler
在SpringBoot集成Quartz,动态创建,更新,暂停,唤醒,删除任务调度一文中实现了动态创建Job。
@SuppressWarnings("unchecked")
public void addTimerJob(SchedulerJob job) {
try {
JobDetail jobDetail = JobBuilder
.newJob((Class<? extends Job>) Class.forName(job.getClassname()))
// 指定执行类
.withIdentity(job.getJobname(), job.getJobgroup())
// 指定name和group
.requestRecovery().withDescription(job.getDescription())
.build();
// 创建表达式调度构建器
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder
.cronSchedule(job.getCronexpression());
// 创建触发器
CronTrigger cronTrigger = TriggerBuilder.newTrigger()
.withIdentity(job.getTriggername(), job.getTriggergroup())
.withSchedule(cronScheduleBuilder).build();
//注册triggerListener
scheduler.getListenerManager().addTriggerListener(new RedisDistributedTriggerListener());
scheduler.scheduleJob(jobDetail, cronTrigger);
scheduler.start();// 触发器并不会立刻触发
System.out
.println("==================================创建Job成功!==================================");
} catch (ClassNotFoundException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
} catch (SchedulerException e) {
e.printStackTrace();
}
}
总结
至此就完成了实现Quartz分布式调度的所有配置,当执行任意的Job任务时,都会触发注册到Scheduler上的TriggerListener的实现类,并在vetoJobExecution方法中获取到锁,此时只能有一个线程获取该锁,并继续执行,并在执行完毕后,在triggerComplete方法中释放锁。
网友评论