美文网首页
数据共享通道—BlockingQueue源码分析

数据共享通道—BlockingQueue源码分析

作者: 远o_O | 来源:发表于2017-07-20 17:56 被阅读15次

    为什么BlockingQueue适合作为进行线程间安全的数据共享的通道?而高性能队列例如:ConcurrentLinkedQueue不适合呢?

    • 关键在于Blocking上面,即阻塞:
    • 当队列为空:进行取操作的线程,会被阻塞(相比循环监测,更省资源),并且当队列中有元素后,线程会被自动唤醒。
    • 当队列满时:和上面类似。

    BlockingQueue作为接口,有一系列满足不同需求的实现类,下面以容量固定的ArrayBlockingQueue为例,进行源码分析,看看为什么这么神奇。

    • 主要体现在take()和put()方法上。

    • take():可以看到,当队列为空的时候,线程会进行等待,下面还有一个重要的dequeue()方法。

        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * Extracts element at current take position, advances, and signals.
         * Call only when holding lock.
         */
        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();
            return x;
        }
    
    

    在dequeue()方法中,一旦有元素取出,就会通知等待在notFull上面的线程,让他们继续工作。

    • put()方法类似,不再赘述。。。
        /**
         * Inserts the specified element at the tail of this queue, waiting
         * for space to become available if the queue is full.
         *
         * @throws InterruptedException {@inheritDoc}
         * @throws NullPointerException {@inheritDoc}
         */
        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    
    
        /**
         * Inserts element at current put position, advances, and signals.
         * Call only when holding lock.
         */
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            notEmpty.signal();
        }
    
    

    相关文章

      网友评论

          本文标题:数据共享通道—BlockingQueue源码分析

          本文链接:https://www.haomeiwen.com/subject/ubdjkxtx.html