J.U.C 阻塞队列源码剖析系列(终结篇)之 DelayQueu

作者: 爱打乒乓的程序员 | 来源:发表于2020-03-20 18:21 被阅读0次

上一篇文章剖析了 PriorityBlockingQueue 的相关源码,那这篇文章接着看另外一个常见的阻塞队列 —— DelayQueue

简介

DelayQueue是一个延迟处理的队列。其内部是复用PriorityQueue类具有对过期的对象优先处理的能力;因为PriorityQueue对象的数据结构是二叉堆,所以我们也可以认为 DelayQueue 对象的数据结构也是二叉堆。值得注意的是,千万不要以为整个二叉堆都是排序的,实际上除了堆头元素之外其余的元素都是无序的。不清楚PriorityQueue源码的朋友,可参考我另外一篇拙作Java 队列之 PriorityQueue 源码分析

依据我的习惯,学习源码之前先写个demo,了解大概的用法先!
示例:

public class DelayQueueDemo {
    public static void main(String[] args) {
        DelayQueue<DelayDemo> delayQueue = new DelayQueue();
        long beginTime = System.currentTimeMillis();
        new Thread(() -> {
            // 2秒后过期
            delayQueue.add(new DelayDemo(beginTime + 2000L, beginTime));
        }, "线程一").start();
        new Thread(() -> {
            try {
                delayQueue.take().show();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程二").start();
        new Thread(() -> {
            try {
                Thread.sleep(1000);
                delayQueue.take().show();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "线程三").start();
    }
}

class DelayDemo implements Delayed {

    private Long beginTime;

    private Long now;

    public DelayDemo() {
    }

    public DelayDemo(Long now, Long beginTime) {
        this.now = now;
        this.beginTime = beginTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(now - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    public void show(){
        System.out.println(Thread.currentThread().getName() + " 执行");
    }
}
线程二 执行

源码剖析

类注释

  • 无界限的队列
  • 队列的元素必须是 Delayed 的子类
  • 当队列元素过期才能被 take 方法调用
  • 越靠近队列头部的元素越快过期
  • 队列不允许有空元素
  • 没有过期的元素或者队列为空,使用 poll 方法会返回null
  • 实现了 Collection 和 Iterator 接口,但无法使用 iterator 方法迭代队列的元素

成员变量

    // 可重入锁,用于线程安全
    private final transient ReentrantLock lock = new ReentrantLock();
    // 优先队列,用于存储元素
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    private Thread leader = null;

    // 用于实现阻塞的Condition对象
    private final Condition available = lock.newCondition();

构造方法

    public DelayQueue() {}

    // 传入集合对象初始化DelayQueue对象。传入的集合对象必须是 Delayed 的子类
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }

添加元素

    // 添加元素
    public boolean add(E e) {
        return offer(e);
    }

    // 添加元素
    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.offer(e);
            // 查看元素是否为队首
            if (q.peek() == e) {
                // 设置leader为空
                leader = null;
                // 唤醒所有等待的队列
                available.signal();
            }
            return true;
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    // 添加元素
    public void put(E e) {
        offer(e);
    }

    // 添加元素
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }

可以发现,添加元素的核心方法是offer(E e)。在添加元素之前加锁保证线程安全,然后调用 PriorityQueue 对象添加元素,由于 DelayQueue 对象没有限定队列容量,所以可认为无限大,添加元素的时候不会检查容量大小,直到发生OOM。

查看元素

    public E poll() {
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lock();
        try {
            E first = q.peek();
            // first == null:队列为空,返回null
            // first.getDelay(NANOSECONDS) > 0:说明第一个元素还没到超时时间,返回null
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll();// 返回第一个元素的值
        } finally {
            // 释放锁
            lock.unlock();
        }
    }

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 加锁
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 第一个元素
                E first = q.peek();
                // 如果队列为空
                if (first == null) {
                    // 如果达到方法超时时间,返回null
                    if (nanos <= 0)
                        return null;
                    else
                        // 阻塞等待
                        nanos = available.awaitNanos(nanos);
                } else {
                    long delay = first.getDelay(NANOSECONDS);
                    // 队列第一个元素已过期,直接出队
                    if (delay <= 0)
                        return q.poll();
                    // 执行poll方法已超时,返回null
                    if (nanos <= 0)
                        return null;
                    // 等待的时候,释放first的引用,避免内存泄漏
                    first = null;
                    // nanos < delay:poll方法超时时间小于队列第一个元素过期时间,则阻塞等待
                    // leader != null:队列第一个元素被其它线程占有,则阻塞等待
                    if (nanos < delay || leader != null)
                        nanos = available.awaitNanos(nanos);
                    else {// 执行到这里说明队列第一个元素没有被其它线程占有,而且当队列第一个元素过期后,poll方法还没超时,则往下执行计算poll方法剩余时间并重新自旋后出队
                        Thread thisThread = Thread.currentThread();
                        // 设置第一个元素为当前线程占有
                        leader = thisThread;
                        try {
                            // 计算队列第一个元素过期后,poll方法的超时时间余下的时间
                            long timeLeft = available.awaitNanos(delay);
                            nanos -= delay - timeLeft;
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 队列第一个元素没有被线程占有而且队列不为空,唤醒所有等待线程
            if (leader == null && q.peek() != null)
                available.signal();
            // 释放锁
            lock.unlock();
        }
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 获取队首元素
                E first = q.peek();
                // 队首为空,则阻塞当前线程
                if (first == null)
                    available.await();
                else {
                    // 获取队首元素的过期时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 已过期,直接出队
                    if (delay <= 0)
                        return q.poll();
                    // 等待的时候,释放first的引用,避免内存泄漏
                    first = null;
                    // leader != null表明有其他线程在操作,阻塞当前线程
                    if (leader != null)
                        available.await();
                    else {
                        // leader指向当前线程
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 等待阻塞,直到first元素过期
                            available.awaitNanos(delay);
                        } finally {
                            // 释放leader
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // leader为null并且队列不为空,说明没有其他线程在等待,那就通知条件队列
            if (leader == null && q.peek() != null)
                available.signal();
            // 释放锁
            lock.unlock();
        }
    }
    
    // 无论队列第一个元素是否超时都会被读取,而且不会删除队列元素
    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.peek();
        } finally {
            lock.unlock();
        }
    }

查看队列元素的方法有三个:peek、poll 和 take 方法。

  1. peek方法最简单,直接调用 PriorityQueue.peek 方法获取队列第一个元素,但不删除;另外两个方法,只要读取到元素就会删除队列第一个元素。
  2. poll方法的参数为空,如果队列第一个元素为空或是否过期就会返回null,否则就会读取并删去元素。如果poll方法参数指定了等待时间,则会在读取元素之前根据方法等待时间和元素本身的过期时间,只有队列不为空,第一个元素被当前线程占有后,元素过期且方法指定的等待时间未超时则会读取并删除第一个元素,否则会一直阻塞直到方法等待时间超时。
  3. 至于take方法的流程与poll方法加上等待时间的流程差不多,如果线程加锁成功后队列第一个元素first已过期则直接读取并删除first,否则会判断是否已有线程占有first元素,如果有其它线程占有,则当前线程执行await操作,否则设置当前线程占有first元素并阻塞等待直到first元素过期,线程一旦达到过期时间就会自旋并读取删除first元素。

take方法源码为例,画出以下流程以帮助理解源码:

删除元素

    // 无论是否过期都会删除指定的元素
    public boolean remove(Object o) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.remove(o);
        } finally {
            lock.unlock();
        }
    }

常用方法

    // 返回队列元素数量
    public int size() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.size();
        } finally {
            lock.unlock();
        }
    }
    
    // 清空队列所有的元素
    public void clear() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            q.clear();
        } finally {
            lock.unlock();
        }
    }

    // 返回队列的剩余容量,总是会返回Integer的最大值
    public int remainingCapacity() {
        return Integer.MAX_VALUE;
    }

    // 将队列转为Object数组返回
    public Object[] toArray() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.toArray();
        } finally {
            lock.unlock();
        }
    }

    // 返回指定类型的数组
    public <T> T[] toArray(T[] a) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            return q.toArray(a);
        } finally {
            lock.unlock();
        }
    }

总结

其实不难看出,DelayQueue 类的实现其实就是通过ReentrantLock锁保证线程安全,要求队列元素实现Delayed接口和复用PriorityQueue类达到延迟和排序的能力!给我最大的收获是,要学会复用的思想,比方说我们可以复用已有的 API 实现一些自研的程序或二次开发,这种思想将会提高开发效率和降低开发的工作量。但是大家有没有想过,DelayQueue 有个缺点,如果数据放在延迟队列中,数据还没过期,忽然宕机了怎么办?因为数据是放在内存中,如果没有持久化的话,宕机后就会丢数据!所以说,使用 DelayQueue 设置过期数据不宜时间太久,会有丢数据的风险!常用的消息队列中间件,如:RocketMQ、Kafka等,其实会考虑到宕机丢数据的情况,所以都会有刷盘机制降低丢数据的风险。

参考资料:

Java 队列之 PriorityQueue 源码分析

慕课网:面试官系统精讲Java源码及大厂真题:https://www.imooc.com/read/47/article/863

相关文章

网友评论

    本文标题:J.U.C 阻塞队列源码剖析系列(终结篇)之 DelayQueu

    本文链接:https://www.haomeiwen.com/subject/ezwbrhtx.html