美文网首页
LinkedBlockingQueue

LinkedBlockingQueue

作者: 风吹过山 | 来源:发表于2017-05-24 16:00 被阅读0次

    一、LinkedBlockingQueue是什么?
    LinkedBlockingQueue是一个线程安全的阻塞队列,具有先进先出等特性。主要实现BlockingQueue接口,BlockingQueue接口继承自java.util.Queue接口,并在这个接口的基础上增加了take和put方法。

    二、线程安全?
    在LinkedBlockingQueue的所有共享的全局变量 中,final声明的capacity在构造器生成实例时就成了不可变量了。而final声明的count由于是AtomicInteger类型的,所以能 够保证其操作的原子性。剩下的final的变量都是初始化成了不可变量,并且不包含可变属性,所以都是访问安全的。那么剩下的就是Node类型的head和 last两个可变量。所以要保证LinkedBlockingQueue是线程安全的就是要保证对head和last的访问是线程安全的 ,其中LinkedBloakingQueue是使用读和写两把锁来控制并发操作的。 如下:

        static class Node<E> {
            E item;
            Node<E> next;
            Node(E x) { item = x; }
        }
    
        /** The capacity bound, or Integer.MAX_VALUE if none */
        private final int capacity;
    
        /** Current number of elements */
        private final AtomicInteger count = new AtomicInteger();
    
        /**
         * Head of linked list.
         * Invariant: head.item == null
         */
        transient Node<E> head;
    
        /**
         * Tail of linked list.
         * Invariant: last.next == null
         */
        private transient Node<E> last;
    
        /** Lock held by take, poll, etc */
        private final ReentrantLock takeLock = new ReentrantLock();
    
        /** Wait queue for waiting takes */
        private final Condition notEmpty = takeLock.newCondition();
    
        /** Lock held by put, offer, etc */
        private final ReentrantLock putLock = new ReentrantLock();
    
        /** Wait queue for waiting puts */
        private final Condition notFull = putLock.newCondition();
    

    三、链表的入队和出队
    LInkedBlockingQueue中的链表,包含头指针和尾指针,其中:
    头指针用来管理元素出队,和 take(), poll(), peek() 三个操作关联
    尾指针用来管理元素入队,和 put(), offer() 两个操作关联
    入队:
    put()算法,为阻塞算法,直到队列有空余时,才能为队列加入新元素
    offer()算法为非阻塞算法,如果队列已满,立即返回或等待一会再返回,通过返回值ture或false,标记本次入队操作是否成功;
    出队:
    take()算法为阻塞算法,直到队列有非空时,才将允许调用线程取出数据
    poll()算法为非阻塞算法,如果队列为空,立即返回或等待一会再返回,通过返回值ture或false,标记本次出队操作是否成功
    peek()算法比较特殊,只返回队列中的第一个元素,既不出队,也不阻塞,如果没有元素,就返回null

    四、take和put

        //出队
        public E take() throws InterruptedException {
            E x;
            int c = -1;
            final AtomicInteger count = this.count;
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lockInterruptibly();//随时保证响应中断 
            try {
                while (count.get() == 0) {
                    notEmpty.await();//队列为null时,,此时线程将处于阻塞状态,直到有新的元素
                }
                x = dequeue();//取出元素
                c = count.getAndDecrement(); // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。
                if (c > 1)
                    notEmpty.signal();//唤醒其他正在等待出队列的线程
            } finally {
                takeLock.unlock();
            }
            if (c == capacity)
                signalNotFull();
            return x;
        }
    
      //入队
        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            // Note: convention in all put/take/etc is to preset local var
            // holding count negative to indicate failure unless set.
            int c = -1;
            Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();//随时保证响应中断 
            try {
                /*
                 * Note that count is used in wait guard even though it is
                 * not protected by lock. This works because count can
                 * only decrease at this point (all other puts are shut
                 * out by lock), and we (or some other waiting put) are
                 * signalled if it ever changes from capacity. Similarly
                 * for all other uses of count in other wait guards.
                 */
                while (count.get() == capacity) {
                    notFull.await();//队列满时,,此时线程将处于阻塞状态,直到队列有空闲的位置才继续执行
                }
                enqueue(node);// //让元素进入队列的末尾
                c = count.getAndIncrement();//c是旧值,为了下面的c==0的判断。意思就是,之前没有元素,现在有了,其他线程赶紧来获取。
                if (c + 1 < capacity)
                    notFull.signal();//唤醒其他正在等待入队列的线程
            } finally {
                putLock.unlock();
            }
            if (c == 0)  /*当c=0时,当前队列为空队列,出队列的线程都处于等待状态,
            现在新添加了一个新的元素,即队列不再为空,因此它会唤醒正在等待获取元素的线程。
            */
                signalNotEmpty();
        }
    

    其中enqueue和dequeue
    enqueue()的源码如下:将元素E添加到队列的末尾,设置node为新的尾节点!

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;
    }
    

    dequeue()的源码如下:删除队列的头节点,并将表头指向“原头节点的下一个节点”。

    private E dequeue() {
     // assert takeLock.isHeldByCurrentThread();
     // assert head.item == null; 
      Node<E> h = head; 
      Node<E> first = h.next; 
      h.next = h; // help GC 
      head = first;
       E x = first.item; 
      first.item = null; 
      return x;
    }
    

    问题:
    补充两个线程去生产和消费的图
    Lock的条件变量? c= count.getAndIncrement();?count原子性的理解

    相关文章

      网友评论

          本文标题:LinkedBlockingQueue

          本文链接:https://www.haomeiwen.com/subject/uqinxxtx.html