使用Redisson提供的RDelayedQueue接口。
好处:
- 性能好,基于redis。
- 支持分布式服务。
- 不需要轮询。
RedissonDelayedQueue
添加延时任务
// 获取阻塞队列实例
RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("myQueue");
// 创建延时队列,关联到阻塞队列
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);
// 向延时队列中添加元素,10秒后过期
delayedQueue.offer("item1", 10, TimeUnit.SECONDS);
消费延时任务
// 获取阻塞队列实例
RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("myQueue");
// 从阻塞队列中取出元素
try {
String item = blockingQueue.take(); // 阻塞直到元素可用
System.out.println("取出元素: " + item);
} catch (InterruptedException e) {
e.printStackTrace();
}
添加延时任务的过程:
把延时任务添加到zset。
本地调度一个和zset队首过期时间保持一致的定时任务。
定时任务到期执行时,会把zset队首的过期任务转移到list。
消费延时任务的过程:
使用list的阻塞式读取命令BLPOP,获取已到期的延时任务。
为了保证调度的定时任务其执行时间和zset队首延时任务的到期时间一致,做了两点:
- 在offer延时任务时,检查新offer的延时任务是不是在队首,如果是,则通过redis的发布订阅触发重新调度定时任务。
- 在定时任务执行时,把队首最多100个到期的延时任务转移到list后,只要zset不空,就按队首延时任务的到期时间调度下次定时任务。
相关阅读:
网友评论