美文网首页
LinkedBlockingQueue

LinkedBlockingQueue

作者: Pillar_Zhong | 来源:发表于2019-06-18 18:47 被阅读0次

    put

    public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        // 获取putLock,可中断
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            // notFull代表,链表未满
            // 这里如果链表已满,当然该条件不满足就要等待
            while (count.get() == capacity) {
                notFull.await();
            }
            // 到这里,说明链表还有空位,加到尾部
            enqueue(node);
            c = count.getAndIncrement();
            // 如果当前链表还没满,那么条件满足,所以要唤醒,无需再等待。
            // 为什么是signal,而不是signalall呢,本来就是一个一个添加的,全部唤醒只是浪费CPU而已。
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            // 释放putLock
            putLock.unlock();
        }
        // notEmpty条件代表,链表非空
        // 如果c==0,说明链表中至少有一个node,那么条件满足,所以要唤醒,无需继续等待
        if (c == 0)
            signalNotEmpty();
    }
    

    offer

    // 跟put类似,唯一不同是这里如果链表已满,那么会立即返回false
    // 不会一直等到链表不满为止
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        final AtomicInteger count = this.count;
        // 当链表已满,那么直接返回false
        if (count.get() == capacity)
            return false;
        int c = -1;
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        putLock.lock();
        try {
            // 当链表未满,那么继续执行入列的操作
            if (count.get() < capacity) {
                enqueue(node);
                c = count.getAndIncrement();
                if (c + 1 < capacity)
                    notFull.signal();
            }
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return c >= 0;
    }
    

    poll

    public E poll() {
        final AtomicInteger count = this.count;
        // 如果链表为空,直接返回null
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        // 获取takelock
        takeLock.lock();
        try {
            // 如果链表不为空,那么继续
            if (count.get() > 0) {
                // head出栈
                x = dequeue();
                c = count.getAndDecrement();
                // 如果当前链表不为空,那么notEmpty条件满足,唤醒
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        // 如果当前链表移除一个的情况下,至少有一个空位,notfull条件满足,有必要唤醒
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

    take

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            // 跟poll不同的是,这里如果链表为空的话,会一直等待
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }
    

    peek

    public E peek() {
        if (count.get() == 0)
            return null;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            // 返回头节点,但并不移除它
            Node<E> first = head.next;
            if (first == null)
                return null;
            else
                return first.item;
        } finally {
            takeLock.unlock();
        }
    }
    

    remove

    public boolean remove(Object o) {
        if (o == null) return false;
        // 拿锁,putlock+takelock
        fullyLock();
        try {
            // 从head往后遍历
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                // 如果能找到该对象,移除该节点
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            // 解锁,takelock+putlock
            fullyUnlock();
        }
    }
    
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }
    
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }
    
    void unlink(Node<E> p, Node<E> trail) {
        // assert isFullyLocked();
        // p.next is not changed, to allow iterators that are
        // traversing p to maintain their weak-consistency guarantee.
        p.item = null;
        // 跳过也就是孤立p节点
        trail.next = p.next;
        // 如果p是尾节点
        if (last == p)
            // 那么设置trail为尾节点,也就是移除p节点
            last = trail;
        // unlink的结果,当然链表未满,那么需要唤醒notFull条件
        if (count.getAndDecrement() == capacity)
            notFull.signal();
    }
    

    相关文章

      网友评论

          本文标题:LinkedBlockingQueue

          本文链接:https://www.haomeiwen.com/subject/iteqqctx.html