生活中我们常常有这样的场景
- 我们网购时,如果我们的订单30分钟没有支付,自动关闭
- 远程遥控家里的空调30分钟后开启
还有多这样的场景,就不一一列举了,这些业务场景我们就需要构造一个延时队列,然后队列中的任务到达指定时间才会被消费。
Redis具体实现:
Redis有一个叫做zset
的数据结构,可以把它理解成一个有序的列表,他底层实现是跳表结构。
跳表
一个使用空间换时间的数据结构,查询的时间复杂度跟红黑树相当,具体的结构如图:
图片源于知乎,侵,删
redis实现延时队列时,我们可以使用时间戳作为排序的依据,这样就可以保证时间最小的任务排在头结点,然后通过不断轮询头结点,当前时间跟头结点的时间做对比,如果头结点的时间小于当前时间,说明该任务需要被处理,尝试移除头结点,如果头结点移除成功,调用具体的处理逻辑即可。
核心代码如下:
- 投放任务到redis
/**
* 向延迟队列中投放任务
*
* @param keyGen 任务队列名称
* @param task 任务
* @param delay 延迟时长,单位毫秒
*/
public void delay(Supplier<String> keyGen, String topic, Object task, Long delay) {
TaskNode taskNode = TaskNode.of(task, topic);
String queueName = String.format("%s:%s", delayQueueProperties.getPrefix(), keyGen.get());
String taskJson = GsonHelper.toJson(taskNode);
long score = System.currentTimeMillis() + delay;
if (log.isDebugEnabled()) {
log.debug("向延时队列投放任务,队列名称[{}],任务[{}],score[{}]", queueName, taskJson, score);
}
stringRedisTemplate.opsForZSet().add(queueName, taskJson, score);
}
- 定时轮询任务
// 死循环定时轮询任务
while (!Thread.interrupted()) {
String prefixPattern = String.format("%s*", delayQueueProperties.getPrefix());
polling(prefixPattern);
try {
TimeUnit.MILLISECONDS.sleep(pollingDuration);
} catch (Exception e) {
e.printStackTrace();
}
}
}, "delay queue polling thread").start();
/**
* 获取延时队列消息
*
* @param pattern zset前缀
*/
private void polling(String pattern) {
Set<String> keys = stringRedisTemplate.keys(pattern);
if (Objects.isNull(keys) || keys.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("延时队列为空或者不存在...");
}
return;
}
keys.stream()
// 多线程处理多个延时队列
.parallel()
.forEach(this::pollingCore);
}
/**
* 获取延时队列的头结点,并判读是否过期,如果过期,尝试移除头结点,
* 移除成功,调用具体的处理任务逻辑
*
* @param key zset key
*/
private void pollingCore(String key) {
// 获取头结点
Set<String> values =
stringRedisTemplate
.opsForZSet()
.rangeByScore(key, 0, System.currentTimeMillis(), 0, 1);
if (Objects.isNull(values) || values.isEmpty()) {
if (log.isDebugEnabled()) {
log.debug("延时队列[{}]没有过期节点", key);
}
return;
}
String taskNodeJson = values.iterator().next();
Boolean remove = Optional.ofNullable(stringRedisTemplate.opsForZSet().remove(key, taskNodeJson))
.map(a -> a != 0)
.orElse(false);
if (remove) {
// TODO 调用具体的处理任务逻辑
}
}
我这里做了一个简单的优化,即,通过小时对延时队列进行分段,这样就是将原来一个队列中的消息按照投放的时间分配到不同的队列中,不会导致延时队列太长,而且这样做的一个好处就是多个线程可以同时操作多个队列,在一定程度上提高效率。
我通过SpringBoot的自动装配技术,给大家实现了一个比较通用的延时队列,屏蔽底层细节,具体是这样使用的。
- 向延时队列中投放任务
引入RedisDelayQueue
调用delay
方法投放任务
@Resource
private RedisDelayQueue redisDelayQueue;
void test(){
redisDelayQueue.delay("topicName",task,20L,timeUnit);
}
api 介绍
delay有很多重载方法,主要的参数有一下几个:
- 投递的topic
- 具体投递的任务 Obj
- 希望多久后获得到这个任务
- 时间单位
- 消费消息
实现HandlerTask
接口,并标注消费的topic
handler方法入参是你当时投递到延迟队列中的json串,需要你自己手动反序列化一下。
@DelayQueueListener(listen = "topic")
public class TaskHandler implements HandlerTask {
@Override
public void handler(String task) {
// todo 具体消费消息逻辑
}
}
- 附录
该项目已上传至github,传送门https://github.com/xiao-ren-wu/redis-delay-queue
如果对你有用,或者比较感兴趣,可以访问我的github,欢迎star
网友评论