美文网首页
使用 Redis 的 zset + 线程池,实现延时队列

使用 Redis 的 zset + 线程池,实现延时队列

作者: 程就人生 | 来源:发表于2022-07-26 22:29 被阅读0次

一个靠谱的延时队列,通常需要保证以下几个特性:

  1. 持久化,不能因为某个应用的重启而信息丢失;
  2. 确认和重试机制,发送成功了再删除,发送失败了可以重试;
  3. 低延时,发送时间尽量精准,误差不能太大;

前几天写的两个demo,一个不能满足持久化,应用一重启信息就丢失了。另一个使用了轮询,太费性能了。于是想再次优化一下,先来看看代码吧。

一、延时任务线程池配置

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/**
 * 延迟任务线程池配置类
 * @author 程就人生
 * @Date
 */
@Configuration
public class TheadPoolConfig {
  
  @Bean("threadPoolTaskScheduler")
  public ThreadPoolTaskScheduler getThreadPoolTaskScheduler(){
    // 定时任务线程池
    ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
    // 线程池大小
    executor.setPoolSize(10);
    // 线程执行前缀
    executor.setThreadNamePrefix("ThreadPoolTaskScheduler-");
    // executor.setWaitForTasksToCompleteOnShutdown(true);
    // executor.setAwaitTerminationSeconds(60);
    executor.initialize();
    return executor;
  }  
}

二、延时任务类

import java.text.SimpleDateFormat;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.ZSetOperations;

/**
 * 延时任务执行的线程
 * @author
 * @Date
 */
public class DelayTaskExec implements Runnable{

  SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  
  private static final Logger log = LoggerFactory.getLogger(DelayTaskExec.class);
  
  ZSetOperations<String, Object> zset;
  
  String uid;
  
  public DelayTaskExec(ZSetOperations<String, Object> zset, String uid){
    this.zset = zset;
    this.uid = uid;
  }
  
  @Override
  public void run() {
    // TODO 执行具体的业务逻辑
    Long now = System.currentTimeMillis();    
    log.info("计划执行时间:{}, 实际执行时间:{},set={}", format.format(zset.score("AA", uid).longValue()), format.format(now), uid);
    // 从redis缓存中移除
    zset.remove("AA", uid);
  }
}

这是任务类,可以把具体延时任务的业务逻辑写在这里。

三、项目重启时,redis缓存数据的处理

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

import com.example.demo.controller.DelayTaskExec;

/**
 * 启动类
 * @author 程就人生
 * @Date
 */
@EnableScheduling
@SpringBootApplication
public class SrpingRedisMqDemoApplication implements CommandLineRunner{
  // redis资源
  @Autowired
  private RedisTemplate<String,Object> redisTemplate;
  // 线程池
  @Autowired
  private ThreadPoolTaskScheduler threadPoolTaskScheduler;
  
  private static final Logger log = LoggerFactory.getLogger(SrpingRedisMqDemoApplication.class);
  
  SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

  public static void main(String[] args) {
    SpringApplication.run(SrpingRedisMqDemoApplication.class, args);
  }

  @Override
  public void run(String... args) throws Exception {
    ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
    Long scheduleTime = System.currentTimeMillis();    
    Set<Object> set = zset.rangeByScore("AA", 0, scheduleTime);
    DelayTaskExec task = null;
    // 历史已经过期的
    if(!set.isEmpty()){

      set.forEach(str->{
        DelayTaskExec task1 = new DelayTaskExec(zset, str.toString());
        threadPoolTaskScheduler.schedule(task1, new Date());
      });
    }
    // 历史未过期的
    set = zset.range("AA", 0, -1);
    if(!set.isEmpty()){

      set.forEach(str->{
        try{
          DelayTaskExec task1 = new DelayTaskExec(zset, str.toString());
          threadPoolTaskScheduler.schedule(task1, new Date(zset.score("AA", str.toString()).longValue()));
        }catch(Exception e){
          log.info("{}已被处理,无需重复处理",str.toString());
        }                
      });
    }    
  }
}

项目重启之后,缓存中的数据分为两部分,一部分是已经过期的,一部分是还未过期的。针对已经过期的,立刻马上处理。针对还未过期的,添加到延时任务线程池中。其中redis的zset存储的score分值便是延时任务执行的时间。

四、延时任务的生成

/**
 * 60秒执行一次
 */
@Scheduled(cron = "*/60 * * * * ?")
public void initKeys() {      
      // zset数据添加
    ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
    Random random = new Random();
    int score = 0;
    Long scheduleTime = 0L;
    DelayTaskExec task = null;
    for(int i=0;i<100;i++){
      // 取值1到100的随机数
      score = random.nextInt(1000);  
      // 设置score,在当前时间上加score秒
      scheduleTime = System.currentTimeMillis() + (1000) * score;
      log.info("AA{}{},过期时间:{}",i,score, format.format(scheduleTime));
      zset.add("AA", "AA" + i + score, scheduleTime);
      // 同时将任务加入到线程池
      task = new DelayTaskExec(zset, "AA" + i + score);
      threadPoolTaskScheduler.schedule(task, new Date(scheduleTime));
    }      
}

在往redis添加延时任务的时候,也需要往线程池中添加了一份,其中存储在zset中的score分值,是延时任务的执行时间。

五、测试结果

5.1 控制台输出,历史任务的执行(应用重启的时候)

5.2 控制台输出,新添加任务的执行


最后总结

这个demo是在上篇demo的基础上进行调整的,demo中的延时时间采用的是当前时间加上随机秒数,zset的value值难免有重复的,在实际应用场景不会出现这个问题,肯定会保证任务id的唯一性,因此这个问题可以忽略不计。定时轮询升级为线程池,大大提高了性能。当然,本示例中没有加确认与重试机制,这个可以在任务类中进一步完善。

本文参考:

相关文章

网友评论

      本文标题:使用 Redis 的 zset + 线程池,实现延时队列

      本文链接:https://www.haomeiwen.com/subject/jldairtx.html