美文网首页
LinkedBlockingQueue

LinkedBlockingQueue

作者: Pillar_Zhong | 来源:发表于2019-06-18 18:47 被阅读0次

put

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    // 获取putLock,可中断
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        // notFull代表,链表未满
        // 这里如果链表已满,当然该条件不满足就要等待
        while (count.get() == capacity) {
            notFull.await();
        }
        // 到这里,说明链表还有空位,加到尾部
        enqueue(node);
        c = count.getAndIncrement();
        // 如果当前链表还没满,那么条件满足,所以要唤醒,无需再等待。
        // 为什么是signal,而不是signalall呢,本来就是一个一个添加的,全部唤醒只是浪费CPU而已。
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        // 释放putLock
        putLock.unlock();
    }
    // notEmpty条件代表,链表非空
    // 如果c==0,说明链表中至少有一个node,那么条件满足,所以要唤醒,无需继续等待
    if (c == 0)
        signalNotEmpty();
}

offer

// 跟put类似,唯一不同是这里如果链表已满,那么会立即返回false
// 不会一直等到链表不满为止
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 当链表已满,那么直接返回false
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        // 当链表未满,那么继续执行入列的操作
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

poll

public E poll() {
    final AtomicInteger count = this.count;
    // 如果链表为空,直接返回null
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    // 获取takelock
    takeLock.lock();
    try {
        // 如果链表不为空,那么继续
        if (count.get() > 0) {
            // head出栈
            x = dequeue();
            c = count.getAndDecrement();
            // 如果当前链表不为空,那么notEmpty条件满足,唤醒
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    // 如果当前链表移除一个的情况下,至少有一个空位,notfull条件满足,有必要唤醒
    if (c == capacity)
        signalNotFull();
    return x;
}

take

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 跟poll不同的是,这里如果链表为空的话,会一直等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

peek

public E peek() {
    if (count.get() == 0)
        return null;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        // 返回头节点,但并不移除它
        Node<E> first = head.next;
        if (first == null)
            return null;
        else
            return first.item;
    } finally {
        takeLock.unlock();
    }
}

remove

public boolean remove(Object o) {
    if (o == null) return false;
    // 拿锁,putlock+takelock
    fullyLock();
    try {
        // 从head往后遍历
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            // 如果能找到该对象,移除该节点
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        // 解锁,takelock+putlock
        fullyUnlock();
    }
}

void fullyLock() {
    putLock.lock();
    takeLock.lock();
}

void fullyUnlock() {
    takeLock.unlock();
    putLock.unlock();
}
void unlink(Node<E> p, Node<E> trail) {
    // assert isFullyLocked();
    // p.next is not changed, to allow iterators that are
    // traversing p to maintain their weak-consistency guarantee.
    p.item = null;
    // 跳过也就是孤立p节点
    trail.next = p.next;
    // 如果p是尾节点
    if (last == p)
        // 那么设置trail为尾节点,也就是移除p节点
        last = trail;
    // unlink的结果,当然链表未满,那么需要唤醒notFull条件
    if (count.getAndDecrement() == capacity)
        notFull.signal();
}

相关文章

  • Java LinkedBlockingQueue

    本章介绍JUC包中的LinkedBlockingQueue。目录 LinkedBlockingQueue介绍 Li...

  • LinkedBlockingQueue

    一、LinkedBlockingQueue是什么?LinkedBlockingQueue是一个线程安全的阻塞队列,...

  • LinkedBlockingQueue

  • LinkedBlockingQueue

    LinkedBlockingQueue是基于链表的阻塞先进先出队列,可以指定一个最大的长度限制以防止过度扩展,未指...

  • LinkedBlockingQueue

    接ArrayBlockingQueue,这里我谈下我对LinkedBlockingQueue的理解。 由阻塞队列的...

  • LinkedBlockingQueue

    Linked Blocking Queue介绍 Linked Blocking Queue是一个单向链表实现的阻塞...

  • LinkedBlockingQueue

    简介 LinkedBlockingQueue 底层结构为单项链表,拥有两把锁 takeLock 和 putLock...

  • LinkedBlockingQueue

    put offer poll take peek remove

  • LinkedBlockingQueue

    LinkedBlockingDeque 与 LinkedBlockingQueue 对比 LinkedBlocki...

  • LinkedBlockingQueue

    方法说明解释add增加一个元索如果队列已满,则抛出一个IIIegaISlabEepeplian异常offer添加一...

网友评论

      本文标题:LinkedBlockingQueue

      本文链接:https://www.haomeiwen.com/subject/iteqqctx.html