J.U.C 阻塞队列源码剖析系列(终结篇)之 DelayQueu

作者: 爱打乒乓的程序员 | 来源:发表于2020-03-20 18:21 被阅读0次

    上一篇文章剖析了 PriorityBlockingQueue 的相关源码,那这篇文章接着看另外一个常见的阻塞队列 —— DelayQueue

    简介

    DelayQueue是一个延迟处理的队列。其内部是复用PriorityQueue类具有对过期的对象优先处理的能力;因为PriorityQueue对象的数据结构是二叉堆,所以我们也可以认为 DelayQueue 对象的数据结构也是二叉堆。值得注意的是,千万不要以为整个二叉堆都是排序的,实际上除了堆头元素之外其余的元素都是无序的。不清楚PriorityQueue源码的朋友,可参考我另外一篇拙作Java 队列之 PriorityQueue 源码分析

    依据我的习惯,学习源码之前先写个demo,了解大概的用法先!
    示例:

    public class DelayQueueDemo {
        public static void main(String[] args) {
            DelayQueue<DelayDemo> delayQueue = new DelayQueue();
            long beginTime = System.currentTimeMillis();
            new Thread(() -> {
                // 2秒后过期
                delayQueue.add(new DelayDemo(beginTime + 2000L, beginTime));
            }, "线程一").start();
            new Thread(() -> {
                try {
                    delayQueue.take().show();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "线程二").start();
            new Thread(() -> {
                try {
                    Thread.sleep(1000);
                    delayQueue.take().show();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "线程三").start();
        }
    }
    
    class DelayDemo implements Delayed {
    
        private Long beginTime;
    
        private Long now;
    
        public DelayDemo() {
        }
    
        public DelayDemo(Long now, Long beginTime) {
            this.now = now;
            this.beginTime = beginTime;
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(now - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }
    
        @Override
        public int compareTo(Delayed o) {
            return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
        }
    
        public void show(){
            System.out.println(Thread.currentThread().getName() + " 执行");
        }
    }
    
    线程二 执行
    

    源码剖析

    类注释

    • 无界限的队列
    • 队列的元素必须是 Delayed 的子类
    • 当队列元素过期才能被 take 方法调用
    • 越靠近队列头部的元素越快过期
    • 队列不允许有空元素
    • 没有过期的元素或者队列为空,使用 poll 方法会返回null
    • 实现了 Collection 和 Iterator 接口,但无法使用 iterator 方法迭代队列的元素

    成员变量

        // 可重入锁,用于线程安全
        private final transient ReentrantLock lock = new ReentrantLock();
        // 优先队列,用于存储元素
        private final PriorityQueue<E> q = new PriorityQueue<E>();
    
        private Thread leader = null;
    
        // 用于实现阻塞的Condition对象
        private final Condition available = lock.newCondition();
    

    构造方法

        public DelayQueue() {}
    
        // 传入集合对象初始化DelayQueue对象。传入的集合对象必须是 Delayed 的子类
        public DelayQueue(Collection<? extends E> c) {
            this.addAll(c);
        }
    

    添加元素

        // 添加元素
        public boolean add(E e) {
            return offer(e);
        }
    
        // 添加元素
        public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                q.offer(e);
                // 查看元素是否为队首
                if (q.peek() == e) {
                    // 设置leader为空
                    leader = null;
                    // 唤醒所有等待的队列
                    available.signal();
                }
                return true;
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    
        // 添加元素
        public void put(E e) {
            offer(e);
        }
    
        // 添加元素
        public boolean offer(E e, long timeout, TimeUnit unit) {
            return offer(e);
        }
    

    可以发现,添加元素的核心方法是offer(E e)。在添加元素之前加锁保证线程安全,然后调用 PriorityQueue 对象添加元素,由于 DelayQueue 对象没有限定队列容量,所以可认为无限大,添加元素的时候不会检查容量大小,直到发生OOM。

    查看元素

        public E poll() {
            final ReentrantLock lock = this.lock;
            // 加锁
            lock.lock();
            try {
                E first = q.peek();
                // first == null:队列为空,返回null
                // first.getDelay(NANOSECONDS) > 0:说明第一个元素还没到超时时间,返回null
                if (first == null || first.getDelay(NANOSECONDS) > 0)
                    return null;
                else
                    return q.poll();// 返回第一个元素的值
            } finally {
                // 释放锁
                lock.unlock();
            }
        }
    
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            // 加锁
            lock.lockInterruptibly();
            try {
                for (;;) {
                    // 第一个元素
                    E first = q.peek();
                    // 如果队列为空
                    if (first == null) {
                        // 如果达到方法超时时间,返回null
                        if (nanos <= 0)
                            return null;
                        else
                            // 阻塞等待
                            nanos = available.awaitNanos(nanos);
                    } else {
                        long delay = first.getDelay(NANOSECONDS);
                        // 队列第一个元素已过期,直接出队
                        if (delay <= 0)
                            return q.poll();
                        // 执行poll方法已超时,返回null
                        if (nanos <= 0)
                            return null;
                        // 等待的时候,释放first的引用,避免内存泄漏
                        first = null;
                        // nanos < delay:poll方法超时时间小于队列第一个元素过期时间,则阻塞等待
                        // leader != null:队列第一个元素被其它线程占有,则阻塞等待
                        if (nanos < delay || leader != null)
                            nanos = available.awaitNanos(nanos);
                        else {// 执行到这里说明队列第一个元素没有被其它线程占有,而且当队列第一个元素过期后,poll方法还没超时,则往下执行计算poll方法剩余时间并重新自旋后出队
                            Thread thisThread = Thread.currentThread();
                            // 设置第一个元素为当前线程占有
                            leader = thisThread;
                            try {
                                // 计算队列第一个元素过期后,poll方法的超时时间余下的时间
                                long timeLeft = available.awaitNanos(delay);
                                nanos -= delay - timeLeft;
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                // 队列第一个元素没有被线程占有而且队列不为空,唤醒所有等待线程
                if (leader == null && q.peek() != null)
                    available.signal();
                // 释放锁
                lock.unlock();
            }
        }
    
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {
                    // 获取队首元素
                    E first = q.peek();
                    // 队首为空,则阻塞当前线程
                    if (first == null)
                        available.await();
                    else {
                        // 获取队首元素的过期时间
                        long delay = first.getDelay(NANOSECONDS);
                        // 已过期,直接出队
                        if (delay <= 0)
                            return q.poll();
                        // 等待的时候,释放first的引用,避免内存泄漏
                        first = null;
                        // leader != null表明有其他线程在操作,阻塞当前线程
                        if (leader != null)
                            available.await();
                        else {
                            // leader指向当前线程
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                // 等待阻塞,直到first元素过期
                                available.awaitNanos(delay);
                            } finally {
                                // 释放leader
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                // leader为null并且队列不为空,说明没有其他线程在等待,那就通知条件队列
                if (leader == null && q.peek() != null)
                    available.signal();
                // 释放锁
                lock.unlock();
            }
        }
        
        // 无论队列第一个元素是否超时都会被读取,而且不会删除队列元素
        public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return q.peek();
            } finally {
                lock.unlock();
            }
        }
    

    查看队列元素的方法有三个:peek、poll 和 take 方法。

    1. peek方法最简单,直接调用 PriorityQueue.peek 方法获取队列第一个元素,但不删除;另外两个方法,只要读取到元素就会删除队列第一个元素。
    2. poll方法的参数为空,如果队列第一个元素为空或是否过期就会返回null,否则就会读取并删去元素。如果poll方法参数指定了等待时间,则会在读取元素之前根据方法等待时间和元素本身的过期时间,只有队列不为空,第一个元素被当前线程占有后,元素过期且方法指定的等待时间未超时则会读取并删除第一个元素,否则会一直阻塞直到方法等待时间超时。
    3. 至于take方法的流程与poll方法加上等待时间的流程差不多,如果线程加锁成功后队列第一个元素first已过期则直接读取并删除first,否则会判断是否已有线程占有first元素,如果有其它线程占有,则当前线程执行await操作,否则设置当前线程占有first元素并阻塞等待直到first元素过期,线程一旦达到过期时间就会自旋并读取删除first元素。

    take方法源码为例,画出以下流程以帮助理解源码:

    删除元素

        // 无论是否过期都会删除指定的元素
        public boolean remove(Object o) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return q.remove(o);
            } finally {
                lock.unlock();
            }
        }
    

    常用方法

        // 返回队列元素数量
        public int size() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return q.size();
            } finally {
                lock.unlock();
            }
        }
        
        // 清空队列所有的元素
        public void clear() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                q.clear();
            } finally {
                lock.unlock();
            }
        }
    
        // 返回队列的剩余容量,总是会返回Integer的最大值
        public int remainingCapacity() {
            return Integer.MAX_VALUE;
        }
    
        // 将队列转为Object数组返回
        public Object[] toArray() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return q.toArray();
            } finally {
                lock.unlock();
            }
        }
    
        // 返回指定类型的数组
        public <T> T[] toArray(T[] a) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return q.toArray(a);
            } finally {
                lock.unlock();
            }
        }
    

    总结

    其实不难看出,DelayQueue 类的实现其实就是通过ReentrantLock锁保证线程安全,要求队列元素实现Delayed接口和复用PriorityQueue类达到延迟和排序的能力!给我最大的收获是,要学会复用的思想,比方说我们可以复用已有的 API 实现一些自研的程序或二次开发,这种思想将会提高开发效率和降低开发的工作量。但是大家有没有想过,DelayQueue 有个缺点,如果数据放在延迟队列中,数据还没过期,忽然宕机了怎么办?因为数据是放在内存中,如果没有持久化的话,宕机后就会丢数据!所以说,使用 DelayQueue 设置过期数据不宜时间太久,会有丢数据的风险!常用的消息队列中间件,如:RocketMQ、Kafka等,其实会考虑到宕机丢数据的情况,所以都会有刷盘机制降低丢数据的风险。

    参考资料:

    Java 队列之 PriorityQueue 源码分析

    慕课网:面试官系统精讲Java源码及大厂真题:https://www.imooc.com/read/47/article/863

    相关文章

      网友评论

        本文标题:J.U.C 阻塞队列源码剖析系列(终结篇)之 DelayQueu

        本文链接:https://www.haomeiwen.com/subject/ezwbrhtx.html