项目中有很多定时任务,而且他们的执行模式非常类似,抽象整理如下
定时任务特点
- 比较多:项目中的定时任务很多,大概有20个左右
- 任务保存在Redis中:为了减少对数据库的压力,定时任务大部分都是使用Redis的ZSET进行存储
- 幂等:任务是幂等的
- 集群分布式执行
- 分布式锁: 虽然执行是幂等的,但是为了提高效率,还是使用分布式锁,保证一个任务只会被执行一次
执行流程
- 获取任务的批量数据
- 尝试获取数据对应的锁,如果没有获取就结束
- 再次在任务源中判断该数据是否待处理,如果是就处理,否则就跳过
- 执行该数据的任务(需要幂等)
- 从数据源中修改状态为已处理或删除
- 释放数据锁
代码示例
@Slf4j
@RequiredArgsConstructor
@AllArgsConstructor
public class RedisSortedSetTaskWorker {
private final RedissonClient redissonClient;
private final StringRedisTemplate stringRedisTemplate;
private final String redisKey;
private final Function<String, Boolean> task;
private Predicate<String> directRemoveFilter;
private boolean continueOnError = true;
public void work(double beginScore, double endScore, @Nullable Integer limit) {
//获取符合条件的所有子任务
Set<String> subTasks = limit == null
? stringRedisTemplate.opsForZSet().rangeByScore(redisKey, beginScore, endScore)
: stringRedisTemplate.opsForZSet().rangeByScore(redisKey, beginScore, endScore, 0, limit);
log.info("[{}]: there are [{}] sub tasks to try to work", redisKey, subTasks == null ? 0 : subTasks.size());
if (subTasks == null) {
return;
}
//删除子任务
if (directRemoveFilter != null) {
Set<String> toRemove = subTasks.stream().filter(directRemoveFilter).collect(Collectors.toSet());
Object[] directToRemove = toRemove.toArray();
if (directToRemove.length > 0) {
stringRedisTemplate.opsForZSet().remove(redisKey, directToRemove);
subTasks.removeAll(toRemove);
}
}
log.info("there are [{}] sub tasks to work after filter", subTasks.size());
for (String subTask : subTasks) {
try {
String lockKey = redisKey + ":lock:" + subTask;
trySubmit(lockKey, () -> {
//如果已经不存在了,就直接返回,说明可能被其他节点执行了
Double score = stringRedisTemplate.opsForZSet().score(redisKey, subTask);
if (score == null) {
return;
}
//如果执行成功就删除该子任务(subTask)
boolean success = task.apply(subTask);
if (success) {
stringRedisTemplate.opsForZSet().remove(redisKey, subTask);
}
});
} catch (RuntimeException e) {
log.warn("fail to work one sub task: redisKey[{}], subTask[{}]", redisKey, subTask, e);
//遇到异常是否继续
if (continueOnError) {
continue;
}
throw e;
}
}
}
/**
* 尝试分布式环境下互斥执行任务.
*
* @param lockName 分布式锁
* @param work 任务
* @return true锁成功,false锁失败
*/
private boolean trySubmit(String lockName, Runnable work) {
RLock lock = redissonClient.getLock(lockName);
//获取分布式锁
if (!lock.tryLock()) {
log.info("fail to lock [{}]", lockName);
return false;
}
log.info("lock [{}] successfully", lockName);
try {
work.run();
return true;
} catch (Exception e) {
log.warn("fail to trySubmit", e);
throw e;
} finally {
lock.unlock();
}
}
}
网友评论