美文网首页
LinkedBlockingQueue

LinkedBlockingQueue

作者: Qi0907 | 来源:发表于2017-05-23 18:03 被阅读0次

    LinkedBlockingQueue是基于链表的阻塞先进先出队列,可以指定一个最大的长度限制以防止过度扩展,未指定情况下其大小为Integer.MAX_VALUE,LinkedBlockingQueue使用了两个锁takeLock(用于删除元素),putLock(用于添加元素),这样添加和删除操作就可以分开做,提高队列的并发性能
    链表节点的定义如下:

        static class Node<E> {
            E item;//数据
    
            /**
             * One of:
             * - the real successor Node
             * - this Node, meaning the successor is head.next
             * - null, meaning there is no successor (this is the last node)
             */
            Node<E> next;//下一个节点的指针
    
            Node(E x) { item = x; }
        }
    

    再来看其中几个关键元素:

        /** The capacity bound, or Integer.MAX_VALUE if none */
    //阻塞队列所能存储的最大容量,默认值Integer.MAX_VALUE.
        private final int capacity;
    
        /** Current number of elements */
        //当前阻塞队列中的元素数量,他定义为一个原子操作的Integer
        //因为LinkedBlockingQueue的入队和出队使用的是两个不同的锁
        //隐藏使用原子操作类来解决对同一个变量进行并发修改的线程安全问题
        private final AtomicInteger count = new AtomicInteger();
    
        /**
         * Head of linked list.
         * Invariant: head.item == null
         */
         //头部元素的数据总是null,head.item == null
        transient Node<E> head;
    
        /**
         * Tail of linked list.
         * Invariant: last.next == null
         */
         //尾部的下一个节点指针总是null,last.next == null
        private transient Node<E> last;
    

    之后看一下LinkedBlockingQueue的构造方法

         //当指定了队列大小时,设置头和尾的指针指向同一个节点,且数据内容为null
        public LinkedBlockingQueue(int capacity) {
            if (capacity <= 0) throw new IllegalArgumentException();
            this.capacity = capacity;
            last = head = new Node<E>(null);
        }
    
         //也可以把一个集合里的所有元素加入队列
        public LinkedBlockingQueue(Collection<? extends E> c) {
            this(Integer.MAX_VALUE);
            final ReentrantLock putLock = this.putLock;
            //获取入队锁
            putLock.lock(); // Never contended, but necessary for visibility
            try {
                int n = 0;
                //循环集合里的每个元素,入队,并更新队列元素个数
                for (E e : c) {
                    if (e == null)
                        throw new NullPointerException();
                    if (n == capacity)
                        throw new IllegalStateException("Queue full");
                    enqueue(new Node<E>(e));
                    ++n;
                }
                count.set(n);
            } finally {
            //释放入队锁
                putLock.unlock();
            }
        }
    
        private void enqueue(Node<E> node) {
            // assert putLock.isHeldByCurrentThread();
            // assert last.next == null;
            //让新入队列的元素成为原来的last的next,这个元素称为last
            //即添加元素到队尾
            last = last.next = node;
        }
    

    之后再看一下入队操作put(E e)

        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
            //这个就是上面note里说的定义一个本地变量,为什么?
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            /*
                执行可中断的锁获取操作,即意味着如果线程由于获取
                锁而处于Blocked状态时,线程是可以被中断而不再继
                续等待,这也是一种避免死锁的一种方式,不会因为
                发现到死锁之后而由于无法中断线程最终只能重启应用。
            */
            putLock.lockInterruptibly();//加入队锁
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                 /*
                当队列的容量到底最大容量时,此时线程将处于等待状
                态,直到队列有空闲的位置才继续执行。
                */
                while (count.get() == capacity) {//队列满了
                    notFull.await();//加入等待队列,直到队列不满-->阻塞操作
                }
                enqueue(node);//入队
                c = count.getAndIncrement();
                /*c+1得到的结果是新元素入队列之后队列元素的总和。
                当前队列中的总元素个数小于最大容量时,此时唤醒其他执行入队列的线程
                让它们可以放入元素
                */
                if (c + 1 < capacity)
                    notFull.signal();
            } finally {
                putLock.unlock();//释放入队锁
            }
            /*当c=0时,即意味着之前的队列是空队列,出队列的线程都处于等待状态,
            现在新添加了一个新的元素,即队列不再为空,因此它会唤醒正在等待获取元素的线程。
            */
            if (c == 0)
                signalNotEmpty();
        }
    

    还有一个offer(E e)的操作,同样是入队,与put(E e)有一些差异:offer(E e是如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞,而put是如果队列满了,一直阻塞,直到队列不满了或者线程被中断-->阻塞(还有一个offer(E e, long timeout, TimeUnit unit),会阻塞线程到TimeUnit时间到达,时间到了队列还是满的,无法插入元素,就返回false)

        public boolean offer(E e) {
            if (e == null) throw new NullPointerException();
            final AtomicInteger count = this.count;
            if (count.get() == capacity)
                return false;
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            putLock.lock();
            try {
                    /*
                当获取到锁时,需要进行二次的检查,因为可能当队列的大小为capacity-1时,
                两个线程同时去抢占锁,而只有一个线程抢占成功,那么此时
                当线程将元素入队列后,释放锁,后面的线程抢占锁之后,此时队列
                大小已经达到capacity,所以将它无法让元素入队列。
                */
                if (count.get() < capacity) {
                    enqueue(node);
                    c = count.getAndIncrement();
                    if (c + 1 < capacity)
                        notFull.signal();
                }
            } finally {
                putLock.unlock();
            }
            if (c == 0)
                signalNotEmpty();
            return c >= 0;
        }
    

    看完入队操作,再看一下出队操作take()

        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();//获取出队锁,并且支持线程中断
            try {
                while (count.get() == 0) {//队列中没元素
                    notEmpty.await();//等待直到有元素加入-->阻塞
                }
                x = dequeue();//取出元素
                c = count.getAndDecrement();
                /*
                  当一个元素出队列之后,队列的大小依旧大于1时
                  当前线程会唤醒其他执行元素出队列的线程,让它们也
                  可以执行元素的获取
                */
                if (c > 1)
                    notEmpty.signal();
            } finally {
                takeLock.unlock();//释放出队锁
            }
            /*
                当c==capaitcy时,即在获取当前元素之前,
                队列已经满了,而此时获取元素之后,队列就会
                空出一个位置,故当前线程会唤醒执行插入操作的线
                程通知其他中的一个可以进行插入操作。
            */
            if (c == capacity)
                signalNotFull();
            return x;
        }
    
         /**
         * 让头部元素出队列的过程
         * 其最终的目的是让原来的head被GC回收,让其的next成为head
         * 并且新的head的item为null.
         * 因为LinkedBlockingQueue的头部具有一致性:即元素为null。
         */
        private E dequeue() {
            // assert takeLock.isHeldByCurrentThread();
            // assert head.item == null;
            Node<E> h = head;
            Node<E> first = h.next;
            h.next = h; // help GC
            head = first;
            E x = first.item;
            first.item = null;
            return x;
        }
    

    还有一个poll()的操作,同样是出队,与take()有一些差异:poll()是如果没有元素,直接返回null;如果有元素,出队-->不阻塞,而take()是如果队列空了,一直阻塞,直到队列不为空或者线程被中断-->阻塞(还有一个poll(long timeout, TimeUnit unit),会阻塞线程到TimeUnit时间到达,时间到了队列还是空的,无法取出元素,就返回false)

        public E poll() {
            final AtomicInteger count = this.count;
            if (count.get() == 0)//没有元素
                return null;//返回null
            E x = null;
            int c = -1;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();//获取出队锁
            try {
                if (count.get() > 0) {//有元素
                    x = dequeue();//出队
                    c = count.getAndDecrement();
                    if (c > 1)
                        notEmpty.signal();
                }
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
    

    相关文章

      网友评论

          本文标题:LinkedBlockingQueue

          本文链接:https://www.haomeiwen.com/subject/updgxxtx.html