延迟队列:往队列中放入的元素具有一定的延迟时间,延迟时间到期后,take或者poll方法才能获取到这些元素。
先看以下延迟队列的构造:
//1.队列中的元素通过实现Delayed接口来实现延迟时间控制
//2.队列实现BlockingQueue接口,实现了相应的阻塞功能
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
//操作公共资源的地方,采用加锁方式实现
private final transient ReentrantLock lock = new ReentrantLock();
//队列内部使用优先级队列来存放元素
private final PriorityQueue<E> q = new PriorityQueue<E>();
//队列使用Condition来控制读写线程的相互等待
private final Condition available = lock.newCondition
//leader:用于标记是否已经有线程与头节点配对了,并且记录是哪个线程配对的。
private Thread leader = null;
}
作为一个阻塞队列,则一定具备offer,take,poll的能力,下面我们一一来看以下具体的实现。
offer方法:
//新增元素
public boolean offer(E e) {
//通过锁来控制并发
final ReentrantLock lock = this.lock;
lock.lock();
try {
//优先队列中存放元素
q.offer(e);
//才放入的元素成为了头节点,则有两种情况,第一种是队列本身为空,这种情况下是否重置leader=null其实没有影响,因为这个时候本身leader就为Null
//另外一种情况是新放入的元素由于优先级最高,所以变为了头节点,这个时候需要重新设置leader为null,并且唤醒等待中的线程,让取数线程重新和新的头节点配对
//因为新的头节点的delay时间肯定不同了,所以需要线程醒来后,重新配对,重新设置睡眠时间(take方法的代码1处)
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
take方法:
/***
一直等待,直到队列中有元素到期,或者线程被中断
***/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); //获取锁并接受中断
try {
for (;;) {
E first = q.peek();
if (first == null) //队列为空
available.await(); //睡眠当前线程,释放锁,让其它线程(生产者或者消费者)操作
else { //队列不为空
long delay = first.getDelay(NANOSECONDS);//获取头节点的过期时间
if (delay <= 0) //头节点已经过期,直接返回节点
return q.poll();
first = null; // don't retain ref while waiting
//头节点未过期,说明所有节点都还未过期,无法出队列, 当前线程需要睡眠等待
if (leader != null) //leader的设定是标记即将获取元素的线程,如果它不等于空,说明当前即将过期的头节点已经有其它线程预定了,所以当前线程只能睡眠了。
available.await();
else {//如果leader等于空,则当前线程为本次获取该头节点的线程
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
//代码1
available.awaitNanos(delay); //让当前线程睡眠时间等于头节点到期时间
} finally {
//当前线程从睡眠中醒来,或者被中断。如果是正常醒来, 则delay<=0了,下一次循环则会出队列,所以让出leader位置。
//如果是被中断,则说明本线程放弃了获取元素的权力,同样需要让出leader的位置
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//如果leader为空(当前头节点没有被预定),并且队列不为空,唤醒在条件上等待的线程
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();//释放锁
}
}
poll方法与take方法类似,只是多了一个方法自带的过期时间:
//带过期时间的获取元素
//该方法一直等待元素过期,直到方法指定的timeout到期
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout); //方法等待过期时间
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null) { //如果头节点等于空
if (nanos <= 0) //方法等待到期
return null;
else
nanos = available.awaitNanos(nanos); //队列中没有可以出队的元素,则让当前线程阻塞
} else { //头节点不等于空
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0) //头节点延迟时间过期,则直接出队列
return q.poll();
if (nanos <= 0) //如果头节点还未过期,而方法等待时间过期,则返回null
return null;
first = null; // don't retain ref while waiting //线程等待期间不引用头节点
//-----以下是阻塞队列的逻辑---------
// //如果方法等待时间比头节点过期时间小,则睡眠方法等待时间(取小),否则如果leader!=null(头节点已经被预定),则同样睡眠方法等待时间(取大)
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else { //如果方法等待的时间大于头节点过期时间(方法能等到头节点过期并将它取出,但是当前头节点还未过期),并且leader线程等于null
Thread thisThread = Thread.currentThread();
leader = thisThread; //当前线程获得与头节点配对的资格
try {
long timeLeft = available.awaitNanos(delay); //线程睡眠头节点过期时间
nanos -= delay - timeLeft; //方法等待时间减去本次已经等待的实际时间,然后自旋重新检查元素过期情况。
} finally { //见take方法的说明
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {//见take方法的说明
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
总结:
1.要想使用延迟队列,则元素一定要实现Delayed接口
2.延迟队列中,线程安全的问题是通过显示的重入锁控制的,所以同一时间只有一个线程在操作
3.内部是用优先级队列来存放元素的,优先级队列非线程安全,所以才需要2.
4.线程等待是通过显示锁的条件进行等待的,由于Condition的实现也是一个FIFO队列,所以DelayQueue是一个公平策略的队列。
5.每一个线程是否进行睡眠,或者睡眠多久,其关键是leader决定的,因为leader告知了当前线程头节点是否已经被别的线程配对。
网友评论