美文网首页
多线程并发编程15-DelayQueue源码剖析

多线程并发编程15-DelayQueue源码剖析

作者: Demo_zfs | 来源:发表于2020-03-22 20:18 被阅读0次

    今天来说一说DelayQueue,DelayQueue并发队列是一个无界阻塞延迟队列,队列中的每个元素都有一个过期时间,当从队列获取元素时,只有过期元素才会出队列,不允许存放null元素。队列头元素是最快要过期的元素。

    DelayQueue内部有一个PriorityQueue优先队列,存入到该队列的元素都实现Delayed接口,由于每个原始都有一个过期时间,所以要实现获取当前元素还剩多少时间就过期的接口,由于元素是存放在PriorityQueue优先队列中的,所以需要实现元素之间相互比较的接口。

    DelayQueue内部实现了一个Leader-Follower的模式,用于尽量减少不必要的线程等待。当一个线程调用队列的take方法变为leader线程后,它会调用条件变量available.awaitNanos(delay)等待delay时间,但其他线程(follower线程)则会调用available.await()进行无限等待。leader线程延迟时间过期后,会退出take方法,并通过调用available.signal()方法唤醒一个follower线程,被唤醒的follower线程被选举为leader线程。当向队列插入一个过期时间比头元素过期时间还短的元素时,leader会被重置为null。

    DelayQueue内部主要的成员变量:

private final transient ReentrantLock lock = new ReentrantLock();

private final PriorityQueue<E> q = new PriorityQueue<E>();

private Thread leader = null;

private final Condition available = lock.newCondition();

    下面对主要函数原理进行讲解。

offer(E e)

    插入元素到队列,当元素e为null时会抛出NullPointException异常,否则由于是无界队列,所以一直返回true。插入的元素需要实现Delayed接口。

public boolean offer(E e) {

//(1)尝试获取独占锁。

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

//(2)将元素放入PriorityQueue优先队列中,如果e为null则会抛出NullPointException异常。

        q.offer(e);

//(3)如果当前插入的元素成为队列中的第一个节点(即插入元素的过期时间比队列中的所有元素的过期时间都小)则将leader设置为null,并调用available.signal()方法激活available条件队列中的一个线程。

        if (q.peek() == e) {

            leader = null;

            available.signal();

        }

        return true;

    } finally {

//(4)释放锁。

        lock.unlock();

    }

}

take()

    从队列中获取并移除延迟时间过期的元素,当队列中没有元素或没有延迟时间过去的元素则会进行阻塞。

public E take() throws InterruptedException {

//(1)尝试获取独占锁 ,调用的是lockInterruptibly 方法,所以会被中断。

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

        for (;;) {

//(2)如果队列中没有元素则调用available.await()进行阻塞。

            E first = q.peek();

            if (first == null)

                available.await();

            else {

//(3)如果获取队列的第一个元素的延迟时间已到期,则返回该元素并从队列中移除。

                long delay = first.getDelay(NANOSECONDS);

                if (delay <= 0)

                    return q.poll();

//(4)这里有个小细节,当要进行阻塞等待的时候将first 变量置为null不引用队列中的元素。

                first = null; 

//(5)如果当前线程不是leader线程,则调用available.await()方法持续阻塞等待,直到被唤醒。

                if (leader != null)

                    available.await();

                else {

//(6)如果当前线程是leader线程,而调用available.awaitNanos(delay)方法,阻塞等待指定的过期时间。

                    Thread thisThread = Thread.currentThread();

                    leader = thisThread;

                    try {

                        available.awaitNanos(delay);

                    } finally {

                        if (leader == thisThread)

                            leader = null;

                    }

                }

            }

        }

    } finally {

        if (leader == null && q.peek() != null)

//(7)如果代码执行到这,说明当前线程从队列中移除过期元素了,但是队列中还存在元素,则需要调用available.signal() 方法,唤醒available条件队列中阻塞的线程去获取队列中的元素。

            available.signal();

//(8)释放锁。

        lock.unlock();

    }

}

poll()

    尝试获取并移除优先级队列中的头元素,如果优先级队列为空或者头元素的延迟时间还没有过期则返回null,否则返回头元素并从优先级队列中移除。

public E poll() {

//(1)尝试获取独占锁。

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

//(2)获取优先级队列中的第一个元素。

        E first = q.peek();

//(3)如果第一个元素为null(即优先级队列中无元素)或第一个元素的延迟时间没有过期则返回null,否则返回头元素并将头元素从优先级队列中移除。

        if (first == null || first.getDelay(NANOSECONDS) > 0)

            return null;

        else

            return q.poll();

    } finally {

//(4)释放锁。

        lock.unlock();

    }

}

size()

    获取队列中的元素个数,由于在获取元素个数前需要先获取锁,所有size()方法返回的元素个数是精准的。

public int size() {

//(1)尝试获取独占锁。

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

//(2)获取优先级队列中的元素个数。

        return q.size();

    } finally {

//(3)释放锁。

        lock.unlock();

    }

}

    DelayQueue阻塞无界延迟队列,内部使用PriorityQueue优先级队列进行存储元素,根据元素的延迟时间进行排序,使用ReentrantLock独占锁实现线程同步。

    今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。

相关文章

网友评论

      本文标题:多线程并发编程15-DelayQueue源码剖析

      本文链接:https://www.haomeiwen.com/subject/grwvyhtx.html