一个靠谱的延时队列,通常需要保证以下几个特性:
- 持久化,不能因为某个应用的重启而信息丢失;
- 确认和重试机制,发送成功了再删除,发送失败了可以重试;
- 低延时,发送时间尽量精准,误差不能太大;
前几天写的两个demo,一个不能满足持久化,应用一重启信息就丢失了。另一个使用了轮询,太费性能了。于是想再次优化一下,先来看看代码吧。
一、延时任务线程池配置
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
/**
* 延迟任务线程池配置类
* @author 程就人生
* @Date
*/
@Configuration
public class TheadPoolConfig {
@Bean("threadPoolTaskScheduler")
public ThreadPoolTaskScheduler getThreadPoolTaskScheduler(){
// 定时任务线程池
ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
// 线程池大小
executor.setPoolSize(10);
// 线程执行前缀
executor.setThreadNamePrefix("ThreadPoolTaskScheduler-");
// executor.setWaitForTasksToCompleteOnShutdown(true);
// executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}
二、延时任务类
import java.text.SimpleDateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ZSetOperations;
/**
* 延时任务执行的线程
* @author
* @Date
*/
public class DelayTaskExec implements Runnable{
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private static final Logger log = LoggerFactory.getLogger(DelayTaskExec.class);
ZSetOperations<String, Object> zset;
String uid;
public DelayTaskExec(ZSetOperations<String, Object> zset, String uid){
this.zset = zset;
this.uid = uid;
}
@Override
public void run() {
// TODO 执行具体的业务逻辑
Long now = System.currentTimeMillis();
log.info("计划执行时间:{}, 实际执行时间:{},set={}", format.format(zset.score("AA", uid).longValue()), format.format(now), uid);
// 从redis缓存中移除
zset.remove("AA", uid);
}
}
这是任务类,可以把具体延时任务的业务逻辑写在这里。
三、项目重启时,redis缓存数据的处理
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import com.example.demo.controller.DelayTaskExec;
/**
* 启动类
* @author 程就人生
* @Date
*/
@EnableScheduling
@SpringBootApplication
public class SrpingRedisMqDemoApplication implements CommandLineRunner{
// redis资源
@Autowired
private RedisTemplate<String,Object> redisTemplate;
// 线程池
@Autowired
private ThreadPoolTaskScheduler threadPoolTaskScheduler;
private static final Logger log = LoggerFactory.getLogger(SrpingRedisMqDemoApplication.class);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
public static void main(String[] args) {
SpringApplication.run(SrpingRedisMqDemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
Long scheduleTime = System.currentTimeMillis();
Set<Object> set = zset.rangeByScore("AA", 0, scheduleTime);
DelayTaskExec task = null;
// 历史已经过期的
if(!set.isEmpty()){
set.forEach(str->{
DelayTaskExec task1 = new DelayTaskExec(zset, str.toString());
threadPoolTaskScheduler.schedule(task1, new Date());
});
}
// 历史未过期的
set = zset.range("AA", 0, -1);
if(!set.isEmpty()){
set.forEach(str->{
try{
DelayTaskExec task1 = new DelayTaskExec(zset, str.toString());
threadPoolTaskScheduler.schedule(task1, new Date(zset.score("AA", str.toString()).longValue()));
}catch(Exception e){
log.info("{}已被处理,无需重复处理",str.toString());
}
});
}
}
}
项目重启之后,缓存中的数据分为两部分,一部分是已经过期的,一部分是还未过期的。针对已经过期的,立刻马上处理。针对还未过期的,添加到延时任务线程池中。其中redis的zset存储的score分值便是延时任务执行的时间。
四、延时任务的生成
/**
* 60秒执行一次
*/
@Scheduled(cron = "*/60 * * * * ?")
public void initKeys() {
// zset数据添加
ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
Random random = new Random();
int score = 0;
Long scheduleTime = 0L;
DelayTaskExec task = null;
for(int i=0;i<100;i++){
// 取值1到100的随机数
score = random.nextInt(1000);
// 设置score,在当前时间上加score秒
scheduleTime = System.currentTimeMillis() + (1000) * score;
log.info("AA{}{},过期时间:{}",i,score, format.format(scheduleTime));
zset.add("AA", "AA" + i + score, scheduleTime);
// 同时将任务加入到线程池
task = new DelayTaskExec(zset, "AA" + i + score);
threadPoolTaskScheduler.schedule(task, new Date(scheduleTime));
}
}
在往redis添加延时任务的时候,也需要往线程池中添加了一份,其中存储在zset中的score分值,是延时任务的执行时间。
五、测试结果
5.1 控制台输出,历史任务的执行(应用重启的时候)
5.2 控制台输出,新添加任务的执行
最后总结
这个demo是在上篇demo的基础上进行调整的,demo中的延时时间采用的是当前时间加上随机秒数,zset的value值难免有重复的,在实际应用场景不会出现这个问题,肯定会保证任务id的唯一性,因此这个问题可以忽略不计。定时轮询升级为线程池,大大提高了性能。当然,本示例中没有加确认与重试机制,这个可以在任务类中进一步完善。
本文参考:
网友评论