美文网首页
阻塞队列 — DelayQueue源码分析

阻塞队列 — DelayQueue源码分析

作者: 一角钱技术 | 来源:发表于2020-11-26 10:02 被阅读0次

    点赞再看,养成习惯,公众号搜一搜【一角钱技术】关注更多原创技术文章。本文 GitHub org_hejianhui/JavaStudy 已收录,有我的系列文章。

    前言


    DelayQueue 由优先级支持的、基于时间的调度队列,内部使用非线程安全的优先队列(PriorityQueue)实现,而无界队列基于数组的扩容实现。在创建元素时,可以指定多久才能从队列中获取当前元素。只有延时期满后才能从队列中获取元素。

    队列创建

    BlockingQueue<String> blockingQueue = new DelayQueue();
    

    要求:入队的对象必须要实现 Delayed接口,而Delayed集成自 Comparable 接口。

    Delayed 接口使对象成为延迟对象,它使存放在DelayQueue类中的对象具有了激活日期。该接口强制实现下列两个方法。

    • CompareTo(Delayed o):Delayed接口继承了Comparable接口,因此有了这个方法。让元素按激活日期排队
    • getDelay(TimeUnit unit):这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。

    应用场景

    1. 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
    2. 定时任务调度。使用DelayQueue 保存当天将会执行的任务和执行时间,一旦从DelayQueue 中获取到的任务就开始执行,比如 TimerQueue 就是使用DelayQueue实现的。

    我们来看具体的电影票/火车票支付两个例子。

    案例1:电影票

    通过电影票这个简单示例来掌握延迟队列的使用。

    1. 定义电影票并存设计存放到延迟队列的元素
    package com.niuh.queue.delayed.v1;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 电影票
     */
    public class MovieTiket implements Delayed {
        //延迟时间
        private final long delay;
        //到期时间
        private final long expire;
        //数据
        private final String msg;
        //创建时间
        private final long now;
    
        public long getDelay() {
            return delay;
        }
    
        public long getExpire() {
            return expire;
        }
    
        public String getMsg() {
            return msg;
        }
    
        public long getNow() {
            return now;
        }
    
        /**
         * @param msg 消息
         * @param delay 延期时间
         */
        public MovieTiket(String msg , long delay) {
            this.delay = delay;
            this.msg = msg;
            expire = System.currentTimeMillis() + delay;    //到期时间 = 当前时间+延迟时间
            now = System.currentTimeMillis();
        }
    
        /**
         * @param msg
         */
        public MovieTiket(String msg){
            this(msg,1000);
        }
    
        public MovieTiket(){
            this(null,1000);
        }
    
        /**
         * 获得延迟时间   用过期时间-当前时间,时间单位毫秒
         * @param unit
         * @return
         */
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expire
                    - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
        }
    
        /**
         * 用于延迟队列内部比较排序  当前时间的延迟时间 - 比较对象的延迟时间
         * 越早过期的时间在队列中越靠前
         * @param delayed
         * @return
         */
        public int compareTo(Delayed delayed) {
            return (int) (this.getDelay(TimeUnit.MILLISECONDS)
                    - delayed.getDelay(TimeUnit.MILLISECONDS));
        }
    
        @Override
        public String toString() {
            return "MovieTiket{" +
                    "delay=" + delay +
                    ", expire=" + expire +
                    ", msg='" + msg + '\'' +
                    ", now=" + now +
                    '}';
        }
    }
    
    1. 按电影票的过期时间进行入队测试
    package com.niuh.queue.delayed.v1;
    
    import java.util.concurrent.DelayQueue;
    
    /**
     * 延迟队列
     */
    public class DelayedQueueRun {
    
        public static void main(String[] args) {
            DelayQueue<MovieTiket> delayQueue = new DelayQueue<MovieTiket>();
            MovieTiket tiket = new MovieTiket("电影票0", 10000);
            delayQueue.put(tiket);
            MovieTiket tiket1 = new MovieTiket("电影票1", 5000);
            delayQueue.put(tiket1);
            MovieTiket tiket2 = new MovieTiket("电影票2", 8000);
            delayQueue.put(tiket2);
            System.out.println("message:--->入队完毕");
    
            while (delayQueue.size() > 0) {
                try {
                    tiket = delayQueue.take();
                    System.out.println("电影票出队:" + tiket.getMsg());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    

    案例2:订单支付

    再看一个订单支付场景稍微复杂的场景。在12306抢到火车票之后,通常需要在30分钟内付钱,否则订单就会取消


    解决思路

    火车票提交订单的时候,首先保存到数据库,并同时将订单数据保存到 DelayQueue 中,开启一个线程监控 DelayQueue,利用 DelayQueue 的特性,先过期的数据会被 take出来,若发现此时订单未支付,那就是过期未支付,更改订单状态。


    实现代码
    1. SaveOrder 订单相关服务
    package com.niuh.queue.delayed.v2.service.busi;
    
    import com.niuh.queue.delayed.v2.service.IDelayOrder;
    import com.niuh.queue.delayed.v2.dao.OrderExpDao;
    import com.niuh.queue.delayed.v2.model.OrderExp;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    
    import javax.annotation.PostConstruct;
    import java.util.Date;
    import java.util.List;
    import java.util.Random;
    
    /**
     * <p>
     * 订单相关的服务
     * </p>
     **/
    @Slf4j
    public class SaveOrder {
    
    
        // 取消付款
        public final static short UNPAY = 0;
        // 付款
        public final static short PAYED = 1;
        // 过期
        public final static short EXPIRED = -1;
    
        @Autowired
        private OrderExpDao orderExpDao;
    
        @Autowired
        @Qualifier("dq")
        private IDelayOrder delayOrder;
    
        /**
         * 接收前端页面参数,生成订单
         *
         * @param orderNumber 订单个数
         */
        public void insertOrders(int orderNumber) {
            Random r = new Random();
            OrderExp orderExp;
            for (int i = 0; i < orderNumber; i++) {
                //订单的超时时长,单位秒
                long expireTime = r.nextInt(20) + 5;
                orderExp = new OrderExp();
                String orderNo = "DD00_" + expireTime + "S";
                orderExp.setOrderNo(orderNo);
                orderExp.setOrderNote("火车票订单——" + orderNo);
                orderExp.setOrderStatus(UNPAY);
                orderExpDao.insertDelayOrder(orderExp, expireTime);
                log.info("保存订单到DB:" + orderNo);
                delayOrder.orderDelay(orderExp, expireTime);
            }
        }
    
        /**
         * 应用重启带来的问题:
         * 1、保存在Queue中的订单会丢失,这些丢失的订单会在什么时候过期,因为队列里已经没有这个订单了,无法检查了,这些订单就得不到处理了。
         * 2、已过期的订单不会被处理,在应用的重启阶段,可能会有一部分订单过期,这部分过期未支付的订单同样也得不到处理,会一直放在数据库里,
         * 过期未支付订单所对应的资源比如电影票所对应的座位,就不能被释放出来,让别的用户来购买。
         * 解决之道 :在系统启动时另行处理
         */
        @PostConstruct
        public void initDelayOrder() {
            log.info("系统启动,扫描表中过期未支付的订单并处理.........");
            int counts = orderExpDao.updateExpireOrders();
            log.info("系统启动,处理了表中[" + counts + "]个过期未支付的订单!");
            List<OrderExp> orderList = orderExpDao.selectUnPayOrders();
            log.info("系统启动,发现了表中还有[" + orderList.size() + "]个未到期未支付的订单!推入检查队列准备到期检查....");
            for (OrderExp order : orderList) {
                long expireTime = order.getExpireTime().getTime() - (new Date().getTime());
                delayOrder.orderDelay(order, expireTime);
            }
        }
    }
    
    1. IDelayOrder 延时处理订单的接口
    package com.niuh.queue.delayed.v2.service;
    
    import com.niuh.queue.delayed.v2.model.OrderExp;
    
    /**
     * <p>
     * 延时处理订单的接口
     * </p>
     *
     */
    public interface IDelayOrder {
    
        /**
         * 进行延时处理的方法
         * @param order 要进行延时处理的订单
         * @param expireTime 延时时长,单位秒
         */
        void orderDelay(OrderExp order, long expireTime);
    }
    
    
    1. DqMode 阻塞队列的实现
    package com.niuh.queue.delayed.v2.service.impl;
    
    import com.niuh.queue.delayed.v2.model.OrderExp;
    import com.niuh.queue.delayed.v2.service.busi.DlyOrderProcessor;
    import com.niuh.queue.delayed.v2.vo.ItemVo;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.PostConstruct;
    import javax.annotation.PreDestroy;
    import java.util.concurrent.DelayQueue;
    
    /**
     * <p>
     * 阻塞队列的实现
     * </p>
     */
    @Service
    @Qualifier("dq")
    @Slf4j
    public class DqMode {
    
        @Autowired
        private DlyOrderProcessor processDelayOrder;
        private Thread takeOrder;
    
        private static DelayQueue<ItemVo<OrderExp>> delayOrder = new DelayQueue<ItemVo<OrderExp>>();
    
        public void orderDelay(OrderExp order, long expireTime) {
            ItemVo<OrderExp> itemOrder = new ItemVo<OrderExp>(expireTime*1000,order);
            delayOrder.put(itemOrder);
            log.info("订单[超时时长:"+expireTime+"秒]被推入检查队列,订单详情:"+order);
        }
    
        private class TakeOrder implements Runnable{
    
            private DlyOrderProcessor processDelayOrder;
    
            public TakeOrder(DlyOrderProcessor processDelayOrder) {
                super();
                this.processDelayOrder = processDelayOrder;
            }
    
            public void run() {
                log.info("处理到期订单线程已经启动!");
                while(!Thread.currentThread().isInterrupted()) {
                    try {
                        ItemVo<OrderExp> itemOrder = delayOrder.take();
                        if (itemOrder!=null) {
                            processDelayOrder.checkDelayOrder(itemOrder.getData());
                        }
                    } catch (Exception e) {
                        log.error("The thread :",e);
                    }
                }
                log.info("处理到期订单线程准备关闭......");
            }
        }
    
        @PostConstruct
        public void init() {
            takeOrder = new Thread(new TakeOrder(processDelayOrder));
            takeOrder.start();
        }
    
        @PreDestroy
        public void close() {
            takeOrder.interrupt();
        }
    }
    
    1. ItemVo 存放到延迟队列的元素,对业务数据进行了包装
    package com.niuh.queue.delayed.v2.vo;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    /**
     * <p>
     * 存放到延迟队列的元素,对业务数据进行了包装
     * </p>
     */
    public class ItemVo<T> implements Delayed {
    
        //到期时间,但传入的数值代表过期的时长,传入单位毫秒
        private long activeTime;
        private T data;//业务数据,泛型
    
        public ItemVo(long activeTime, T data) {
            super();
            this.activeTime = activeTime + System.currentTimeMillis();
            this.data = data;
        }
    
        public long getActiveTime() {
            return activeTime;
        }
    
        public T getData() {
            return data;
        }
    
        /**
         * 这个方法返回到激活日期的剩余时间,时间单位由单位参数指定。
         */
        public long getDelay(TimeUnit unit) {
            long d = unit.convert(this.activeTime - System.currentTimeMillis(), unit);
            return d;
        }
    
        /**
         *Delayed接口继承了Comparable接口,按剩余时间排序,实际计算考虑精度为纳秒数
         */
        public int compareTo(Delayed o) {
            long d = (getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
        }
    }
    
    1. DlyOrderProcessor 处理延期订单的服务
    package com.niuh.queue.delayed.v2.service.busi;
    
    import com.niuh.queue.delayed.v2.dao.OrderExpDao;
    import com.niuh.queue.delayed.v2.model.OrderExp;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    /**
     * <p>
     *  处理延期订单的服务
     * </p>
     *
     */
    @Service
    @Slf4j
    public class DlyOrderProcessor {
    
        @Autowired
        private OrderExpDao orderExpDao;
    
        /**检查数据库中指定id的订单的状态,如果为未支付,则修改为已过期*/
        public void checkDelayOrder(OrderExp record) {
            OrderExp dbOrder = orderExpDao.selectByPrimaryKey(record.getId());
            if(dbOrder.getOrderStatus()== SaveOrder.UNPAY) {
                log.info("订单【"+record+"】未支付已过期,需要更改为过期订单!");
                orderExpDao.updateExpireOrder(record.getId());
            }else {
                log.info("已支付订单【"+record+"】,无需修改!");
            }
        }
    }
    

    另外也可以使用MQ来解决,例如 ActiveMQ支持的延迟和定时投递
    修改配置文件(activemq.xml),增加延迟和定时投递支持-----schedulerSupport="true"

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

    工作原理

    DelayQueue的泛型参数需要实现Delayed接口,Delayed接口继承了Comparable接口,DelayQueue内部使用非线程安全的优先队列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待时间。DelayQueue不允许包含null元素。

    Leader/Followers模式:

    1. 有若干个线程(一般组成线程池)用来处理大量的事件
    2. 有一个线程作为领导者,等待事件的发生;其他的线程作为追随者,仅仅是睡眠
    3. 假如有事件需要处理,领导者会从追随者中指定一个新的领导者,自己去处理事件
    4. 唤醒的追随者作为新的领导者等待事件的发生
    5. 处理事件的线程处理完毕以后,就会成为追随者的一员,直到被唤醒成为领导者
    6. 假如需要处理的事件太多,而线程数量不够(能够动态创建线程处理另当别论),则有的事件可能会得不到处理。

    所以线程会有三种身份中的一种:leader 和 follower,以及一个干活中的状态:processser。它的基本原则就是,永远最多只有一个 leader。而所有 follower 都在等待成为 leader。线程池启动时会自动产生一个 Leader 负责等待网络 IO 事件,当有一个事件产生时,Leader 线程首先通知一个 Follower 线程将被其提拔为新的 Leader ,然后自己就去干活了,去处理这个网络事件,处理完毕后加入 Follower 线程等待队列,等待下次成为 Leader。这种方法可以增强 CPU高速缓存相似性,以及消除动态内存分配和线程间的数据交换。

    源码分析

    定义

    DelayQueue的类继承关系如下:



    其包含的方法定义如下:


    成员属性

    DelayQueue 通过组合一个PriorityQueue 来实现元素的存储以及优先级维护,通过ReentrantLock 来保证线程安全,通过Condition 来判断是否可以取数据,对于leader我们后面再来分析它的作用。

    // 可重入锁
    private final transient ReentrantLock lock = new ReentrantLock();
    // 存储元素的优先级队列
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    
    // 获取数据 等待线程标识
    private Thread leader = null;
    
    // 条件控制,表示是否可以从队列中取数据
    private final Condition available = lock.newCondition();
    

    构造函数

    DelayQueue 内部组合PriorityQueue,对元素的操作都是通过PriorityQueue 来实现的,DelayQueue 的构造方法很简单,对于PriorityQueue 都是使用的默认参数,不能通过DelayQueue 来指定PriorityQueue的初始大小,也不能使用指定的Comparator,元素本身就需要实现Comparable ,因此不需要指定的Comparator。

    /**
    * 无参构造函数
    */
    public DelayQueue() {}
    
    /**
    * 通过集合初始化
    */
    public DelayQueue(Collection<? extends E> c) {
        this.addAll(c);
    }
    

    入队方法

    虽然提供入队的接口方式很多,实际都是调用的offer 方法,通过PriorityQueue 来进行入队操作,入队超时方法并没有其超时功能。

    • add(E e),将指定的元素插入到此队列中,在成功时返回 true
    • put(E e),将指定的元素插入此队列中,队列达到最大值,则抛oom异常
    • offer(E e),将指定的元素插入到此队列中,在成功时返回 true
    • offer(E e, long timeout, TimeUnit unit),指定一个等待时间将元素放入队列中并没有意义
    public boolean add(E e) {
        return offer(e);
    }
    
    public void put(E e) {
        offer(e);
    }
    
    public boolean offer(E e, long timeout, TimeUnit unit) {
        return offer(e);
    }
    

    offer(E e)

    将指定的元素插入到此队列中,在成功时返回 true,其他几个方法内部都调用了offer 方法,我们也可以直接调用offer 方法来完成入队操作。

    peek并不一定是当前添加的元素,队头是当前添加元素,说明当前元素e的优先级最小也就即将过期的,这时候激活avaliable变量条件队列里面的一个线程,通知他们队列里面有元素了。

    public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        // 获取锁
        lock.lock();
        try {
            //通过PriorityQueue 来将元素入队
            q.offer(e);
            //peek 是获取的队头元素,唤醒阻塞在available 条件上的一个线程,表示可以从队列中取数据了
            if (q.peek() == e) {
                leader = null;
                // 唤醒通知
                available.signal();
            }
            return true;
        } finally {
            // 解锁
            lock.unlock();
        }
    }
    

    出队方法

    • poll(),获取并移除此队列的头,如果此队列为空,则返回 null
    • poll(long timeout, TimeUnit unit),获取并移除此队列的头部,在指定的等待时间前等待
    • take(),获取并移除此队列的头部,在元素变得可用之前一直等待
    • peek(),调用此方法,可以返回队头元素,但是元素并不出队

    poll()

    获取并移除此队列的头,如果此队列为空,则返回 null

    public E poll() {
        final ReentrantLock lock = this.lock;
        // 获取同步锁
        lock.lock();
        try {
            // 获取队头元素
            E first = q.peek();
            // 如果对头为null 或者 延时还没有到,则返回 null
            if (first == null || first.getDelay(NANOSECONDS) > 0)
                return null;
            else
                return q.poll(); // 否则元素出队
        } finally {
            lock.unlock();
        }
    }
    

    poll(long timeout, TimeUnit unit)

    获取并移除此队列的头部,在指定的等待时间前等待。

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        // 超时等待时间
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        // 获取可中断锁
        lock.lockInterruptibly();
        try 
            // 无限循环
            for (;;) {
                // 获取队头元素
                E first = q.peek
                // 队头为空,也就是队列为空
                if (first == null) {
                    // 达到超时指定时间,返回null
                    if (nanos <= 0)
                        return null;
                    else
                        // 如果还没有超时,那么在available条件上进行等待nanos时间
                        nanos = available.awaitNanos(nanos);
                } else {
                    // 获取元素延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 延时到期
                    if (delay <= 0)
                        return q.poll(); // 返回出队元素
                    // 延时未到期,超时到期,返回null
                    if (nanos <= 0)
                        return null;
                    first = null; // don't retain ref while waiting
                    // 超时等待时间 < 延迟时间 或者 有其他线程再取数据
                    if (nanos < delay || leader != null)
                        // 在available条件上进行等待nanos时间
                        nanos = available.awaitNanos(nanos);
                    else {
                        // 超时等待时间 > 延迟时间 
                        // 并且没有其他线程在等待,那么当前元素成为leader,表示 leader线程最早 正在等待获取元素
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 等待 延迟时间 超时
                            long timeLeft = available.awaitNanos(delay);
                            // 还需要继续等待 nanos
                            nanos -= delay - timeLeft;
                        } finally {
                            // 清除 leader
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            // 唤醒阻塞在 available 的一个线程,表示可以取数据了
            if (leader == null && q.peek() != null)
                available.signal
            // 释放锁
            lock.unlock();
        }
    }
    

    来梳理一下这里的逻辑:

    1. 如果队列为空,如果超时时间未到,则进行等待,否则返回null
    2. 队列不为空,取出对头元素,如果延迟时间到来,则返回元素,否则如果超时时间到返回null
    3. 超时时间未到,并且超时时间 < 延迟时间 或者 有线程正在获取元素,那么进行等待
    4. 超时时间 > 延迟时间,那么肯定可以取到元素,设置 leader为当前线程,等待延迟时间到期。

    这里需要注意的时Condition 条件在阻塞时会释放锁,在被唤醒时会再次获取锁,获取成功才会返回。
    当进行超时等待时,阻塞在Condition 上后会释放锁,一旦释放了锁,那么其它线程就有可能参与竞争,某一个线程就可能会成为leader(参与竞争的时间早,并且能在等待时间内能获取到队头元素那么就可能成为leader)
    leader是用来减少不必要的竞争,如果leader不为空说明已经有线程在取了,设置当前线程等待即可。(leader 就是一个信号,告诉其它线程:你们不要再去获取元素了,它们延迟时间还没到期,我都还没有取到数据呢,你们要取数据,等我取了再说)

    下面用流程图来展示这一过程:


    延时队列.png

    take()

    获取并移除此队列的头部,在元素变得可用之前一直等待

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        // 获取可中断锁
        lock.lockInterruptibly();
        try {
            for (;;) {
                // 获取队头元素
                E first = q.peek();
                // 队头元素为空,则阻塞等待
                if (first == null)
                    available.await();
                else {
                    // 获取元素延迟时间
                    long delay = first.getDelay(NANOSECONDS);
                    // 延时到期
                    if (delay <= 0)
                        return q.poll(); // 返回出队元素
                    first = null; // don't retain ref while waiting
                    // 如果有其它线程在等待获取元素,则当前线程不用去竞争,直接等待
                    if (leader != null)
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;
                        try {
                            // 等待延迟时间到期
                            available.awaitNanos(delay);
                        } finally {
                            //清除 leader
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            //唤醒阻塞在available 的一个线程,表示可以取数据了
            if (leader == null && q.peek() != null)
                available.signal();
            // 释放锁
            lock.unlock();
        }
    }
    

    该方法就是相当于在前面的超时等待中,把超时时间设置为无限大,那么这样只要队列中有元素,要是元素延迟时间要求,那么就可以取出元素,否则就直接等待元素延迟时间到期,再取出元素,最先参与等待的线程会成为leader。

    peek()

    调用此方法,可以返回队头元素,但是元素并不出队。

    public E peek() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //返回队列头部元素,元素不出队
            return q.peek();
        } finally {
            lock.unlock();
        }
    }
    

    完整代码

    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
        implements BlockingQueue<E> {
        
        // 可重入锁
        private final transient ReentrantLock lock = new ReentrantLock();
        // 存储元素的优先级队列
        private final PriorityQueue<E> q = new PriorityQueue<E>();
    
        // 获取数据 等待线程标识    
        private Thread leader = null;
    
        // 条件控制,表示是否可以从队列中取数据
        private final Condition available = lock.newCondition();
    
        /**
         * 无参构造函数
         */
        public DelayQueue() {}
    
        /**
         * 通过集合初始化
         */
        public DelayQueue(Collection<? extends E> c) {
            this.addAll(c);
        }
    
        
        public boolean add(E e) {
            return offer(e);
        }
    
       
        public boolean offer(E e) {
            final ReentrantLock lock = this.lock;
            // 获取锁
            lock.lock();
            try {
                //通过PriorityQueue 来将元素入队
                q.offer(e);
                //peek 是获取的队头元素,唤醒阻塞在available 条件上的一个线程,表示可以从队列中取数据了
                if (q.peek() == e) {
                    leader = null;
                    // 唤醒通知
                    available.signal();
                }
                return true;
            } finally {
                // 解锁
                lock.unlock();
            }
        }
    
       
        public void put(E e) {
            offer(e);
        }
    
        public boolean offer(E e, long timeout, TimeUnit unit) {
            return offer(e);
        }
    
        public E poll() {
            final ReentrantLock lock = this.lock;
            // 获取同步锁
            lock.lock();
            try {
                // 获取队头元素
                E first = q.peek();
                // 如果对头为null 或者 延时还没有到,则返回 null
                if (first == null || first.getDelay(NANOSECONDS) > 0)
                    return null;
                else
                    return q.poll(); // 否则元素出队
            } finally {
                lock.unlock();
            }
        }
    
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            // 获取可中断锁
            lock.lockInterruptibly();
            try {
                for (;;) {
                    // 获取队头元素
                    E first = q.peek();
                    // 队头元素为空,则阻塞等待
                    if (first == null)
                        available.await();
                    else {
                        // 获取元素延迟时间
                        long delay = first.getDelay(NANOSECONDS);
                        // 延时到期
                        if (delay <= 0)
                            return q.poll(); // 返回出队元素
                        first = null; // don't retain ref while waiting
                        // 如果有其它线程在等待获取元素,则当前线程不用去竞争,直接等待
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                // 等待延迟时间到期
                                available.awaitNanos(delay);
                            } finally {
                                //清除 leader
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                //唤醒阻塞在available 的一个线程,表示可以取数据了
                if (leader == null && q.peek() != null)
                    available.signal();
                // 释放锁
                lock.unlock();
            }
        }
    
        
        public E poll(long timeout, TimeUnit unit) throws InterruptedException {
            // 超时等待时间
            long nanos = unit.toNanos(timeout);
            final ReentrantLock lock = this.lock;
            // 获取可中断锁
            lock.lockInterruptibly();
            try 
                // 无限循环
                for (;;) {
                    // 获取队头元素
                    E first = q.peek
                    // 队头为空,也就是队列为空
                    if (first == null) {
                        // 达到超时指定时间,返回null
                        if (nanos <= 0)
                            return null;
                        else
                            // 如果还没有超时,那么在available条件上进行等待nanos时间
                            nanos = available.awaitNanos(nanos);
                    } else {
                        // 获取元素延迟时间
                        long delay = first.getDelay(NANOSECONDS);
                        // 延时到期
                        if (delay <= 0)
                            return q.poll(); // 返回出队元素
                        // 延时未到期,超时到期,返回null
                        if (nanos <= 0)
                            return null;
                        first = null; // don't retain ref while waiting
                        // 超时等待时间 < 延迟时间 或者 有其他线程再取数据
                        if (nanos < delay || leader != null)
                            // 在available条件上进行等待nanos时间
                            nanos = available.awaitNanos(nanos);
                        else {
                            // 超时等待时间 > 延迟时间 
                            // 并且没有其他线程在等待,那么当前元素成为leader,表示 leader线程最早 正在等待获取元素
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                // 等待 延迟时间 超时
                                long timeLeft = available.awaitNanos(delay);
                                // 还需要继续等待 nanos
                                nanos -= delay - timeLeft;
                            } finally {
                                // 清除 leader
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                // 唤醒阻塞在 available 的一个线程,表示可以取数据了
                if (leader == null && q.peek() != null)
                    available.signal
                // 释放锁
                lock.unlock();
            }
        }
    
        public E peek() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                //返回队列头部元素,元素不出队
                return q.peek();
            } finally {
                lock.unlock();
            }
        }
    
        public int size() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return q.size();
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Returns first element only if it is expired.
         * Used only by drainTo.  Call only when holding lock.
         */
        private E peekExpired() {
            // assert lock.isHeldByCurrentThread();
            E first = q.peek();
            return (first == null || first.getDelay(NANOSECONDS) > 0) ?
                null : first;
        }
    
        /**
         * @throws UnsupportedOperationException {@inheritDoc}
         * @throws ClassCastException            {@inheritDoc}
         * @throws NullPointerException          {@inheritDoc}
         * @throws IllegalArgumentException      {@inheritDoc}
         */
        public int drainTo(Collection<? super E> c) {
            if (c == null)
                throw new NullPointerException();
            if (c == this)
                throw new IllegalArgumentException();
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int n = 0;
                for (E e; (e = peekExpired()) != null;) {
                    c.add(e);       // In this order, in case add() throws.
                    q.poll();
                    ++n;
                }
                return n;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * @throws UnsupportedOperationException {@inheritDoc}
         * @throws ClassCastException            {@inheritDoc}
         * @throws NullPointerException          {@inheritDoc}
         * @throws IllegalArgumentException      {@inheritDoc}
         */
        public int drainTo(Collection<? super E> c, int maxElements) {
            if (c == null)
                throw new NullPointerException();
            if (c == this)
                throw new IllegalArgumentException();
            if (maxElements <= 0)
                return 0;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int n = 0;
                for (E e; n < maxElements && (e = peekExpired()) != null;) {
                    c.add(e);       // In this order, in case add() throws.
                    q.poll();
                    ++n;
                }
                return n;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Atomically removes all of the elements from this delay queue.
         * The queue will be empty after this call returns.
         * Elements with an unexpired delay are not waited for; they are
         * simply discarded from the queue.
         */
        public void clear() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                q.clear();
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Always returns {@code Integer.MAX_VALUE} because
         * a {@code DelayQueue} is not capacity constrained.
         *
         * @return {@code Integer.MAX_VALUE}
         */
        public int remainingCapacity() {
            return Integer.MAX_VALUE;
        }
    
        /**
         * Returns an array containing all of the elements in this queue.
         * The returned array elements are in no particular order.
         *
         * <p>The returned array will be "safe" in that no references to it are
         * maintained by this queue.  (In other words, this method must allocate
         * a new array).  The caller is thus free to modify the returned array.
         *
         * <p>This method acts as bridge between array-based and collection-based
         * APIs.
         *
         * @return an array containing all of the elements in this queue
         */
        public Object[] toArray() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return q.toArray();
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Returns an array containing all of the elements in this queue; the
         * runtime type of the returned array is that of the specified array.
         * The returned array elements are in no particular order.
         * If the queue fits in the specified array, it is returned therein.
         * Otherwise, a new array is allocated with the runtime type of the
         * specified array and the size of this queue.
         *
         * <p>If this queue fits in the specified array with room to spare
         * (i.e., the array has more elements than this queue), the element in
         * the array immediately following the end of the queue is set to
         * {@code null}.
         *
         * <p>Like the {@link #toArray()} method, this method acts as bridge between
         * array-based and collection-based APIs.  Further, this method allows
         * precise control over the runtime type of the output array, and may,
         * under certain circumstances, be used to save allocation costs.
         *
         * <p>The following code can be used to dump a delay queue into a newly
         * allocated array of {@code Delayed}:
         *
         * <pre> {@code Delayed[] a = q.toArray(new Delayed[0]);}</pre>
         *
         * Note that {@code toArray(new Object[0])} is identical in function to
         * {@code toArray()}.
         *
         * @param a the array into which the elements of the queue are to
         *          be stored, if it is big enough; otherwise, a new array of the
         *          same runtime type is allocated for this purpose
         * @return an array containing all of the elements in this queue
         * @throws ArrayStoreException if the runtime type of the specified array
         *         is not a supertype of the runtime type of every element in
         *         this queue
         * @throws NullPointerException if the specified array is null
         */
        public <T> T[] toArray(T[] a) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return q.toArray(a);
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Removes a single instance of the specified element from this
         * queue, if it is present, whether or not it has expired.
         */
        public boolean remove(Object o) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                return q.remove(o);
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Identity-based version for use in Itr.remove
         */
        void removeEQ(Object o) {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                for (Iterator<E> it = q.iterator(); it.hasNext(); ) {
                    if (o == it.next()) {
                        it.remove();
                        break;
                    }
                }
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Returns an iterator over all the elements (both expired and
         * unexpired) in this queue. The iterator does not return the
         * elements in any particular order.
         *
         * <p>The returned iterator is
         * <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
         *
         * @return an iterator over the elements in this queue
         */
        public Iterator<E> iterator() {
            return new Itr(toArray());
        }
    
        /**
         * Snapshot iterator that works off copy of underlying q array.
         */
        private class Itr implements Iterator<E> {
            final Object[] array; // Array of all elements
            int cursor;           // index of next element to return
            int lastRet;          // index of last element, or -1 if no such
    
            Itr(Object[] array) {
                lastRet = -1;
                this.array = array;
            }
    
            public boolean hasNext() {
                return cursor < array.length;
            }
    
            @SuppressWarnings("unchecked")
            public E next() {
                if (cursor >= array.length)
                    throw new NoSuchElementException();
                lastRet = cursor;
                return (E)array[cursor++];
            }
    
            public void remove() {
                if (lastRet < 0)
                    throw new IllegalStateException();
                removeEQ(array[lastRet]);
                lastRet = -1;
            }
        }
    
    }
    

    总结

    1. DelayQueue 内部通过组合PriorityQueue 来实现存储和维护元素顺序的;
    2. DelayQueue 存储元素必须实现Delayed 接口,通过实现Delayed 接口,可以获取到元素延迟时间,以及可以比较元素大小(Delayed 继承Comparable);
    3. DelayQueue 通过一个可重入锁来控制元素的入队出队行为;
    4. DelayQueue 中leader 标识 用于减少线程的竞争,表示当前有其它线程正在获取队头元素;
    5. PriorityQueue 只是负责存储数据以及维护元素的顺序,对于延迟时间取数据则是在DelayQueue 中进行判断控制的;
    6. DelayQueue 没有实现序列化接口。

    PS:以上代码提交在 Githubhttps://github.com/Niuh-Study/niuh-juc-final.git

    文章持续更新,可以公众号搜一搜「 一角钱技术 」第一时间阅读, 本文 GitHub org_hejianhui/JavaStudy 已经收录,欢迎 Star。

    相关文章

      网友评论

          本文标题:阻塞队列 — DelayQueue源码分析

          本文链接:https://www.haomeiwen.com/subject/mkleiktx.html