上一篇文章剖析了 PriorityBlockingQueue 的相关源码,那这篇文章接着看另外一个常见的阻塞队列 —— DelayQueue
简介
DelayQueue是一个延迟处理的队列。其内部是复用PriorityQueue
类具有对过期的对象优先处理的能力;因为PriorityQueue
对象的数据结构是二叉堆,所以我们也可以认为 DelayQueue 对象的数据结构也是二叉堆。值得注意的是,千万不要以为整个二叉堆都是排序的,实际上除了堆头元素之外其余的元素都是无序的。不清楚PriorityQueue源码的朋友,可参考我另外一篇拙作Java 队列之 PriorityQueue 源码分析
依据我的习惯,学习源码之前先写个demo,了解大概的用法先!
示例:
public class DelayQueueDemo {
public static void main(String[] args) {
DelayQueue<DelayDemo> delayQueue = new DelayQueue();
long beginTime = System.currentTimeMillis();
new Thread(() -> {
// 2秒后过期
delayQueue.add(new DelayDemo(beginTime + 2000L, beginTime));
}, "线程一").start();
new Thread(() -> {
try {
delayQueue.take().show();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程二").start();
new Thread(() -> {
try {
Thread.sleep(1000);
delayQueue.take().show();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "线程三").start();
}
}
class DelayDemo implements Delayed {
private Long beginTime;
private Long now;
public DelayDemo() {
}
public DelayDemo(Long now, Long beginTime) {
this.now = now;
this.beginTime = beginTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(now - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
public void show(){
System.out.println(Thread.currentThread().getName() + " 执行");
}
}
线程二 执行
源码剖析
类注释
- 无界限的队列
- 队列的元素必须是 Delayed 的子类
- 当队列元素过期才能被 take 方法调用
- 越靠近队列头部的元素越快过期
- 队列不允许有空元素
- 没有过期的元素或者队列为空,使用 poll 方法会返回null
- 实现了 Collection 和 Iterator 接口,但无法使用 iterator 方法迭代队列的元素
成员变量
// 可重入锁,用于线程安全
private final transient ReentrantLock lock = new ReentrantLock();
// 优先队列,用于存储元素
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader = null;
// 用于实现阻塞的Condition对象
private final Condition available = lock.newCondition();
构造方法
public DelayQueue() {}
// 传入集合对象初始化DelayQueue对象。传入的集合对象必须是 Delayed 的子类
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
添加元素
// 添加元素
public boolean add(E e) {
return offer(e);
}
// 添加元素
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);
// 查看元素是否为队首
if (q.peek() == e) {
// 设置leader为空
leader = null;
// 唤醒所有等待的队列
available.signal();
}
return true;
} finally {
// 释放锁
lock.unlock();
}
}
// 添加元素
public void put(E e) {
offer(e);
}
// 添加元素
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
可以发现,添加元素的核心方法是offer(E e)
。在添加元素之前加锁保证线程安全,然后调用 PriorityQueue 对象添加元素,由于 DelayQueue 对象没有限定队列容量,所以可认为无限大,添加元素的时候不会检查容量大小,直到发生OOM。
查看元素
public E poll() {
final ReentrantLock lock = this.lock;
// 加锁
lock.lock();
try {
E first = q.peek();
// first == null:队列为空,返回null
// first.getDelay(NANOSECONDS) > 0:说明第一个元素还没到超时时间,返回null
if (first == null || first.getDelay(NANOSECONDS) > 0)
return null;
else
return q.poll();// 返回第一个元素的值
} finally {
// 释放锁
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
// 加锁
lock.lockInterruptibly();
try {
for (;;) {
// 第一个元素
E first = q.peek();
// 如果队列为空
if (first == null) {
// 如果达到方法超时时间,返回null
if (nanos <= 0)
return null;
else
// 阻塞等待
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
// 队列第一个元素已过期,直接出队
if (delay <= 0)
return q.poll();
// 执行poll方法已超时,返回null
if (nanos <= 0)
return null;
// 等待的时候,释放first的引用,避免内存泄漏
first = null;
// nanos < delay:poll方法超时时间小于队列第一个元素过期时间,则阻塞等待
// leader != null:队列第一个元素被其它线程占有,则阻塞等待
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {// 执行到这里说明队列第一个元素没有被其它线程占有,而且当队列第一个元素过期后,poll方法还没超时,则往下执行计算poll方法剩余时间并重新自旋后出队
Thread thisThread = Thread.currentThread();
// 设置第一个元素为当前线程占有
leader = thisThread;
try {
// 计算队列第一个元素过期后,poll方法的超时时间余下的时间
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 队列第一个元素没有被线程占有而且队列不为空,唤醒所有等待线程
if (leader == null && q.peek() != null)
available.signal();
// 释放锁
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
// 获取队首元素
E first = q.peek();
// 队首为空,则阻塞当前线程
if (first == null)
available.await();
else {
// 获取队首元素的过期时间
long delay = first.getDelay(NANOSECONDS);
// 已过期,直接出队
if (delay <= 0)
return q.poll();
// 等待的时候,释放first的引用,避免内存泄漏
first = null;
// leader != null表明有其他线程在操作,阻塞当前线程
if (leader != null)
available.await();
else {
// leader指向当前线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待阻塞,直到first元素过期
available.awaitNanos(delay);
} finally {
// 释放leader
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// leader为null并且队列不为空,说明没有其他线程在等待,那就通知条件队列
if (leader == null && q.peek() != null)
available.signal();
// 释放锁
lock.unlock();
}
}
// 无论队列第一个元素是否超时都会被读取,而且不会删除队列元素
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.peek();
} finally {
lock.unlock();
}
}
查看队列元素的方法有三个:peek、poll 和 take 方法。
- peek方法最简单,直接调用 PriorityQueue.peek 方法获取队列第一个元素,但不删除;另外两个方法,只要读取到元素就会删除队列第一个元素。
- poll方法的参数为空,如果队列第一个元素为空或是否过期就会返回null,否则就会读取并删去元素。如果poll方法参数指定了等待时间,则会在读取元素之前根据方法等待时间和元素本身的过期时间,只有队列不为空,第一个元素被当前线程占有后,元素过期且方法指定的等待时间未超时则会读取并删除第一个元素,否则会一直阻塞直到方法等待时间超时。
- 至于take方法的流程与poll方法加上等待时间的流程差不多,如果线程加锁成功后队列第一个元素
first
已过期则直接读取并删除first
,否则会判断是否已有线程占有first
元素,如果有其它线程占有,则当前线程执行await操作,否则设置当前线程占有first
元素并阻塞等待直到first
元素过期,线程一旦达到过期时间就会自旋并读取删除first
元素。
以take
方法源码为例,画出以下流程以帮助理解源码:
删除元素
// 无论是否过期都会删除指定的元素
public boolean remove(Object o) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.remove(o);
} finally {
lock.unlock();
}
}
常用方法
// 返回队列元素数量
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.size();
} finally {
lock.unlock();
}
}
// 清空队列所有的元素
public void clear() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.clear();
} finally {
lock.unlock();
}
}
// 返回队列的剩余容量,总是会返回Integer的最大值
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
// 将队列转为Object数组返回
public Object[] toArray() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray();
} finally {
lock.unlock();
}
}
// 返回指定类型的数组
public <T> T[] toArray(T[] a) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return q.toArray(a);
} finally {
lock.unlock();
}
}
总结
其实不难看出,DelayQueue 类的实现其实就是通过ReentrantLock
锁保证线程安全,要求队列元素实现Delayed
接口和复用PriorityQueue
类达到延迟和排序的能力!给我最大的收获是,要学会复用的思想,比方说我们可以复用已有的 API 实现一些自研的程序或二次开发,这种思想将会提高开发效率和降低开发的工作量。但是大家有没有想过,DelayQueue 有个缺点,如果数据放在延迟队列中,数据还没过期,忽然宕机了怎么办?因为数据是放在内存中,如果没有持久化的话,宕机后就会丢数据!所以说,使用 DelayQueue 设置过期数据不宜时间太久,会有丢数据的风险!常用的消息队列中间件,如:RocketMQ、Kafka等,其实会考虑到宕机丢数据的情况,所以都会有刷盘机制降低丢数据的风险。
参考资料:
慕课网:面试官系统精讲Java源码及大厂真题:https://www.imooc.com/read/47/article/863
网友评论