美文网首页
DelayQueue 基本原理

DelayQueue 基本原理

作者: jiangmo | 来源:发表于2017-11-30 20:33 被阅读153次

    阻塞队列

    阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。下面是 java 常见的阻塞队列。

    ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
    LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
    PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
    DelayQueue:一个使用优先级队列实现的无界阻塞队列。
    SynchronousQueue:一个不存储元素的阻塞队列。
    LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
    LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

    基本简介

    DelayQueue是一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部是延迟期满后保存时间最长的Delayed 元素。
    DelayQueue是一个用来延时处理的队列,所谓延时处理就是说可以为队列中元素设定一个过期时间,相关的操作受到这个设定时间的控制。

    使用场景

    a) 关闭空闲连接。服务器中,有很多客户端的连接,空闲一段时间之后需要关闭之。
    b) 缓存。缓存中的对象,超过了空闲时间,需要从缓存中移出。
    c) 任务超时处理。在网络协议滑动窗口请求应答式交互时,处理超时未响应的请求。

    如果不使用DelayQueue,那么常规的解决办法就是:使用一个后台线程,遍历所有对象,挨个检查。这种笨笨的办法简单好用,但是对象数量过多时,可能存在性能问题,检查间隔时间不好设置,间隔时间过大,影响精确度,过小则存在效率问题。而且做不到按超时的时间顺序处理。

    基本原理

    • 首先,这种队列中只能存放实现Delayed接口的对象,而此接口有两个需要实现的方法。最重要的就是getDelay,这个方法需要返回对象过期前的时间。简单说,队列在某些方法处理前,会调用此方法来判断对象有没有超时。
    • 其次,DelayQueue是一个BlockingQueue,其特化的参数是Delayed。(不了解BlockingQueue的同学,先去了解BlockingQueue再看本文)
    • Delayed扩展了Comparable接口,比较的基准为延时的时间值,Delayed接口的实现类getDelay的返回值应为固定值(final)。DelayQueue内部是使用PriorityQueue实现的。

    总结,DelayQueue的关键元素BlockingQueue、PriorityQueue、Delayed。可以这么说,DelayQueue是一个使用优先队列(PriorityQueue)实现的BlockingQueue,优先队列的比较基准值是时间。本质上即:
    DelayQueue = BlockingQueue +PriorityQueue + Delayed

    他们的基本定义如下

    public interface Comparable<T> {
        public int compareTo(T o);
    } 
    public interface Delayed extends Comparable<Delayed> {
        long getDelay(TimeUnit unit);
    } 
    public class DelayQueue<E extends Delayed> implements BlockingQueue<E> { 
        private final PriorityQueue<E> q = new PriorityQueue<E>();
    } 
    

    基本用法

    /**
     * 延迟队列示例
     */
    public class DelayQueueTester {
        private static DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
        static class DelayTask implements Delayed {
            // 延迟时间
            private final long delay;
            // 到期时间
            private final long expire;
            // 数据
            private final String msg;
            // 创建时间
            private final long now;
            /**
             * 初始化 DelayTask 对象
             *
             * @param delay 延迟时间 单位:微妙
             * @param msg   业务信息
             */
            DelayTask(long delay, String msg) {
                this.delay = delay; // 延迟时间
                this.msg = msg; // 业务信息
                this.now = Instant.now().toEpochMilli();
                this.expire = now + delay; // 到期时间 = 当前时间+延迟时间
            }
            /**
             * 获取延迟时间
             *
             * @param unit 单位对象
             * @return
             */
            @Override
            public long getDelay(TimeUnit unit) {
                return unit.convert(expire - Instant.now().toEpochMilli(), TimeUnit.MILLISECONDS);
            }
            /**
             * 比较器
             * 比较规则:延迟时间越长的对象越靠后
             *
             * @param o
             * @return
             */
            @Override
            public int compareTo(Delayed o) {
                if (o == this) // compare zero ONLY if same object
                    return 0;
                return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
            }
            @Override
            public String toString() {
                return "DelayTask{" +
                        "delay=" + delay +
                        ", expire=" + expire +
                        ", msg='" + msg + '\'' +
                        ", now=" + now +
                        '}';
            }
        }
        /**
         * 生产者线程
         *
         * @param args
         */
        public static void main(String[] args) {
            initConsumer();
            try {
                // 等待消费者初始化完毕
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            delayQueue.add(new DelayTask(1000, "Task1"));
            delayQueue.add(new DelayTask(2000, "Task2"));
            delayQueue.add(new DelayTask(3000, "Task3"));
            delayQueue.add(new DelayTask(4000, "Task4"));
            delayQueue.add(new DelayTask(5000, "Task5"));
        }
        /**
         * 初始化消费者线程
         */
        private static void initConsumer() {
            Runnable task = () -> {
                while (true) {
                    try {
                        System.out.println("尝试获取延迟队列中的任务。" + LocalDateTime.now());
                        System.out.println(delayQueue.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
            Thread consumer = new Thread(task);
            consumer.start();
        }
    }
    ---
    尝试获取延迟队列中的任务。2017-04-05T18:28:03.282
    DelayTask{delay=1000, expire=1491388087234, msg='Task1', now=1491388086234}
    尝试获取延迟队列中的任务。2017-04-05T18:28:07.235
    DelayTask{delay=2000, expire=1491388088235, msg='Task2', now=1491388086235}
    尝试获取延迟队列中的任务。2017-04-05T18:28:08.237
    DelayTask{delay=3000, expire=1491388089235, msg='Task3', now=1491388086235}
    尝试获取延迟队列中的任务。2017-04-05T18:28:09.237
    DelayTask{delay=4000, expire=1491388090235, msg='Task4', now=1491388086235}
    尝试获取延迟队列中的任务。2017-04-05T18:28:10.240
    DelayTask{delay=5000, expire=1491388091235, msg='Task5', now=1491388086235}
    尝试获取延迟队列中的任务。2017-04-05T18:28:11.240
    

    DelayQueue 实现原理

    主要属性

    // 可以看看AbstractQueue ,实现了阻塞Queue接口
    public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
        implements BlockingQueue<E> {
        // 阻塞等待使用了可重入锁,只有一把
        private final transient ReentrantLock lock = new ReentrantLock();
        // 优先队列,用来对不同延迟任务的排序
        private final PriorityQueue<E> q = new PriorityQueue<E>();
        /**
         * Thread designated to wait for the element at the head of
         * the queue.  This variant of the Leader-Follower pattern
         * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
         * minimize unnecessary timed waiting.  When a thread becomes
         * the leader, it waits only for the next delay to elapse, but
         * other threads await indefinitely.  The leader thread must
         * signal some other thread before returning from take() or
         * poll(...), unless some other thread becomes leader in the
         * interim.  Whenever the head of the queue is replaced with
         * an element with an earlier expiration time, the leader
         * field is invalidated by being reset to null, and some
         * waiting thread, but not necessarily the current leader, is
         * signalled.  So waiting threads must be prepared to acquire
         * and lose leadership while waiting.
         */
    
        // 这个Leader 有意思,解决了队列头的数据和线程的关联
        // 同时解决了其他线程由谁唤醒
        private Thread leader = null;
    
        /**
         * Condition signalled when a newer element becomes available
         * at the head of the queue or a new thread may need to
         * become leader.
         */
        // 与Leader Thread配合 唤醒等待的Leader或者新Leader替换
        private final Condition available = lock.newCondition();
    
    

    DelayQueue的take方法,把优先队列q的first拿出来(peek),如果没有达到延时阀值,则进行await处理。
    如下:

        public E take() throws InterruptedException {
            // 获取锁。每个延迟队列内聚了一个重入锁。
            final ReentrantLock lock = this.lock;
            // 获取可中断的锁。
            lock.lockInterruptibly();
            try {
                for (;;) {
                    // 尝试从优先级队列中获取队列头部元素
                    E first = q.peek();
                    if (first == null)
                        // 无元素,当前线程节点加入等待队列,并阻塞当前线程
                        available.await();
                    else {
                        // 通过延迟任务的 getDelay 方法获取延迟时间
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            // 延迟时间到期,获取并删除头部元素。
                            return q.poll();
                        first = null; // don't retain ref while waiting
                        // 存在leader线程,则其他的线程进入时,直接进入等待 
                        if (leader != null)
                            available.await();
                        else {
                            // 获取当前线程 说明线程变了
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                // 线程节点进入等待队列 x 纳秒。
                                available.awaitNanos(delay);
                            } finally {
                                // 等待完了,该线程则设置为null
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                // 若还存在元素的话,则将等待队列头节点中的线程节点移动到同步队列中。
                if (leader == null && q.peek() != null)
                    available.signal();
                lock.unlock();
            }
        }
    

    Add

        public boolean add(E e) {
            return offer(e);
        }
        /**
         * Inserts the specified element into this delay queue.
         *
         * @param e the element to add
         * @return {@code true}
         * @throws NullPointerException if the specified element is null
         */
        public boolean offer(E e) {
            // 获取到重入锁
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                q.offer(e);
                if (q.peek() == e) {
                    //  刚添加的元素成为头节点
                    // 那之前的头结点就直接废掉
                    leader = null;
                    // 唤醒take等待的线程,重新走查一遍
                    available.signal();
                }
                return true;
            } finally {
                lock.unlock();
            }
        }
    

    Ref:
    http://blog.csdn.net/kobejayandy/article/details/46833623

    相关文章

      网友评论

          本文标题:DelayQueue 基本原理

          本文链接:https://www.haomeiwen.com/subject/txsbbxtx.html