美文网首页JAVA
BlockingQueue

BlockingQueue

作者: YDDMAX_Y | 来源:发表于2018-09-06 10:46 被阅读0次

    接口分类

    image.png

    BlockingQueue

    问题:ArrayBlockingQueue和LinkedBlockingQueue到底该选哪个?
    答案参见:https://stackoverflow.com/questions/35967792/when-to-prefer-linkedblockingqueue-over-arrayblockingqueue
    array是有界的,linked既支持有界也支持无界。array的放和拿是加的同一把锁,linked是不同的锁。Executors使用的是linked。

    1. ArrayBlockingQueue;
    2. LinkedBlockingQueue;
    3. SynchronousQueue;
      不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。可以这样来理解:生产者和消费者互相等待对方,握手,然后一起离开。由于不维护Queue实体,所以没有插入、删除操作,效率很高。
      HSF的线程池用的就是SynchronousQueue,来一个请求开启一个线程。这样生产者和消费者都能够保持忙碌的状态。
    4. PriorityBlockingQueue
      数组实现的极小堆,元素必须实现Comparable。
     /*
         * The implementation uses an array-based binary heap, with public
         * operations protected with a single lock. However, allocation
         * during resizing uses a simple spinlock (used only while not
         * holding main lock) in order to allow takes to operate
         * concurrently with allocation.  This avoids repeated
         * postponement of waiting consumers and consequent element
         * build-up. The need to back away from lock during allocation
         * makes it impossible to simply wrap delegated
         * java.util.PriorityQueue operations within a lock, as was done
         * in a previous version of this class. To maintain
         * interoperability, a plain PriorityQueue is still used during
         * serialization, which maintains compatibility at the expense of
         * transiently doubling overhead.
         */
    
    1. DelayQueue
      元素必须实现Delayed。
    public interface Delayed extends Comparable<Delayed> {
    
        /**
         * Returns the remaining delay associated with this object, in the
         * given time unit.
         *
         * @param unit the time unit
         * @return the remaining delay; zero or negative values indicate
         * that the delay has already elapsed
         */
        long getDelay(TimeUnit unit);
    }
    
    

    本质是对PriorityQueue<Delayed>的封装。在poll时,还是先调PriorityQueue的poll(poll的第一个),然后再调getDelay判断是否expire了,如果是再返回。该queue的顺序是Comparable决定的(极小堆),getDelay只是决定能否poll出去。

    ScheduledThreadPoolExecutor内部依赖的Queue是DelayedWorkQueue,其基于极小堆,实际上功能和DelayQueue是相同的,compareTo比较的是要执行的时间,getDelay返回的是还要多久执行。

    image.png

    take和poll

    poll的源码:

     public E poll() {
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                E first = q.peek();
                if (first == null || first.getDelay(NANOSECONDS) > 0)
                    return null;
                else
                    return q.poll();
            } finally {
                lock.unlock();
            }
    

    take相对于poll是阻塞的,那么怎么delay到时之后通知正take的线程notify呢?源码里面是用leader线程做的notify,leader线程可能是正take的线程(await delay时间),也可能是别的线程(那就await无限时间)

       Delayed extends Comparable<Delayed> {
    
        /**
         * Returns the remaining delay associated with this object, in the
         * given time unit.
         *
         * @param unit the time unit
         * @return the remaining delay; zero or negative values indicate
         * that the delay has already elapsed
         */
        long getDelay(TimeUnit unit);
    }
    

    相关文章

      网友评论

        本文标题:BlockingQueue

        本文链接:https://www.haomeiwen.com/subject/bditgftx.html