美文网首页JDK源码解析
DelayQueue源码分析

DelayQueue源码分析

作者: i砖工 | 来源:发表于2020-05-06 17:17 被阅读0次

    延迟队列:往队列中放入的元素具有一定的延迟时间,延迟时间到期后,take或者poll方法才能获取到这些元素。
    先看以下延迟队列的构造:

    //1.队列中的元素通过实现Delayed接口来实现延迟时间控制
    //2.队列实现BlockingQueue接口,实现了相应的阻塞功能
    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
        implements BlockingQueue<E> {
        //操作公共资源的地方,采用加锁方式实现
        private final transient ReentrantLock lock = new ReentrantLock();
        //队列内部使用优先级队列来存放元素
        private final PriorityQueue<E> q = new PriorityQueue<E>();
        //队列使用Condition来控制读写线程的相互等待
        private final Condition available = lock.newCondition
        //leader:用于标记是否已经有线程与头节点配对了,并且记录是哪个线程配对的。
        private Thread leader = null;   
    }
    

    作为一个阻塞队列,则一定具备offer,take,poll的能力,下面我们一一来看以下具体的实现。
    offer方法:

    //新增元素
    public boolean offer(E e) {
        //通过锁来控制并发
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //优先队列中存放元素
            q.offer(e);
            //才放入的元素成为了头节点,则有两种情况,第一种是队列本身为空,这种情况下是否重置leader=null其实没有影响,因为这个时候本身leader就为Null
            //另外一种情况是新放入的元素由于优先级最高,所以变为了头节点,这个时候需要重新设置leader为null,并且唤醒等待中的线程,让取数线程重新和新的头节点配对
            //因为新的头节点的delay时间肯定不同了,所以需要线程醒来后,重新配对,重新设置睡眠时间(take方法的代码1处)
            if (q.peek() == e) {
                leader = null;
                available.signal();
            }
            return true;
        } finally {
            lock.unlock();
        }
    }
    

    take方法:

    /***
    一直等待,直到队列中有元素到期,或者线程被中断
    ***/
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly(); //获取锁并接受中断
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) //队列为空
                    available.await(); //睡眠当前线程,释放锁,让其它线程(生产者或者消费者)操作
                else { //队列不为空
                    long delay = first.getDelay(NANOSECONDS);//获取头节点的过期时间
                    if (delay <= 0) //头节点已经过期,直接返回节点
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    
                    //头节点未过期,说明所有节点都还未过期,无法出队列, 当前线程需要睡眠等待
                    if (leader != null)  //leader的设定是标记即将获取元素的线程,如果它不等于空,说明当前即将过期的头节点已经有其它线程预定了,所以当前线程只能睡眠了。
                        available.await();
                    else {//如果leader等于空,则当前线程为本次获取该头节点的线程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            //代码1
                            available.awaitNanos(delay); //让当前线程睡眠时间等于头节点到期时间
                        } finally {
                            //当前线程从睡眠中醒来,或者被中断。如果是正常醒来, 则delay<=0了,下一次循环则会出队列,所以让出leader位置。
                            //如果是被中断,则说明本线程放弃了获取元素的权力,同样需要让出leader的位置
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            //如果leader为空(当前头节点没有被预定),并且队列不为空,唤醒在条件上等待的线程
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();//释放锁
        }
    }
    

    poll方法与take方法类似,只是多了一个方法自带的过期时间:

    //带过期时间的获取元素
    //该方法一直等待元素过期,直到方法指定的timeout到期
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);  //方法等待过期时间
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) { //如果头节点等于空
                    if (nanos <= 0)  //方法等待到期
                        return null; 
                    else
                        nanos = available.awaitNanos(nanos);  //队列中没有可以出队的元素,则让当前线程阻塞
                } else { //头节点不等于空
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0) //头节点延迟时间过期,则直接出队列
                        return q.poll();
                        
                    if (nanos <= 0)  //如果头节点还未过期,而方法等待时间过期,则返回null
                        return null;
                    
                    first = null; // don't retain ref while waiting  //线程等待期间不引用头节点
                    
                    
                    //-----以下是阻塞队列的逻辑---------
                    // //如果方法等待时间比头节点过期时间小,则睡眠方法等待时间(取小),否则如果leader!=null(头节点已经被预定),则同样睡眠方法等待时间(取大)
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos); 
                    else { //如果方法等待的时间大于头节点过期时间(方法能等到头节点过期并将它取出,但是当前头节点还未过期),并且leader线程等于null
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread; //当前线程获得与头节点配对的资格
                        try {
                            long timeLeft = available.awaitNanos(delay); //线程睡眠头节点过期时间
                            nanos -= delay - timeLeft;  //方法等待时间减去本次已经等待的实际时间,然后自旋重新检查元素过期情况。
                        } finally { //见take方法的说明
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {//见take方法的说明
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }
    

    总结:
    1.要想使用延迟队列,则元素一定要实现Delayed接口
    2.延迟队列中,线程安全的问题是通过显示的重入锁控制的,所以同一时间只有一个线程在操作
    3.内部是用优先级队列来存放元素的,优先级队列非线程安全,所以才需要2.
    4.线程等待是通过显示锁的条件进行等待的,由于Condition的实现也是一个FIFO队列,所以DelayQueue是一个公平策略的队列。
    5.每一个线程是否进行睡眠,或者睡眠多久,其关键是leader决定的,因为leader告知了当前线程头节点是否已经被别的线程配对。

    相关文章

      网友评论

        本文标题:DelayQueue源码分析

        本文链接:https://www.haomeiwen.com/subject/egbwghtx.html