美文网首页
Java并发编程(十) LinkedBlockingQueue

Java并发编程(十) LinkedBlockingQueue

作者: skyguard | 来源:发表于2018-11-16 17:06 被阅读0次

下面我们来说一下java中的BlockingQueue。先来看一下BlockingQueue都有哪些方法
offer(E e): 将给定的元素设置到队列中,如果设置成功返回true, 否则返回false. e的值不能为空,否则抛出空指针异常。
offer(E e, long timeout, TimeUnit unit): 将给定元素在给定的时间内设置到队列中,如果设置成功返回true, 否则返回false.
add(E e): 将给定元素设置到队列中,如果设置成功返回true, 否则抛出异常。如果是往限定了长度的队列中设置值,推荐使用offer()方法。
put(E e): 将元素设置到队列中,如果队列中没有多余的空间,该方法会一直阻塞,直到队列中有多余的空间。
take(): 从队列中获取值,如果队列中没有值,线程会一直阻塞,直到队列中有值,并且该方法取得了该值。
poll(long timeout, TimeUnit unit): 在给定的时间里,从队列中获取值,如果没有取到会抛出异常。
remove(Object o): 从队列中移除指定的值。
contains(Object o): 判断队列中是否拥有该值。
put和take方法是阻塞的。下面我们就来看一下LinkedBlockingQueue是怎么实现的。
LinkedBlockingQueue是基于链表实现的,我们来看一下LinkedBlockingQueue都有哪些属性

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
transient Node<E> head;

/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

有一个head,保存首节点的引用,一个last,保存尾节点的引用,还有一个tabkLock和Condition,用来在take操作的时候阻塞线程的,一个putLock和Condition,用来在put操作的时候阻塞线程的。
LinkedBlockingQueue有一个内部类Node,有一个属性item,代表元素,一个Node元素的next,代表下一个节点

static class Node<E> {
    E item;

    /**
     * One of:
     * - the real successor Node
     * - this Node, meaning the successor is head.next
     * - null, meaning there is no successor (this is the last node)
     */
    Node<E> next;

    Node(E x) { item = x; }
}

下面我们来看一下put方法

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    // Note: convention in all put/take/etc is to preset local var
    // holding count negative to indicate failure unless set.
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        /*
         * Note that count is used in wait guard even though it is
         * not protected by lock. This works because count can
         * only decrease at this point (all other puts are shut
         * out by lock), and we (or some other waiting put) are
         * signalled if it ever changes from capacity. Similarly
         * for all other uses of count in other wait guards.
         */
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

先将值放到Node节点中,然后上锁,如果当前队列的数量等于容量,则调用await方法阻塞等待,否则进行入队操作,然后如果队列的数量小于容量,则唤醒等待的线程继续执行,然后解锁,如果队列的数量为0,则唤醒take操作阻塞的线程
再来看一下take方法

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue();
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

先上锁,然后判断如果队列数量等于0,则阻塞当前线程,否则从队列中取出元素,如果队列的数量大于0,则唤醒被阻塞的线程继续执行,然后解锁,如果队列的数量等于容量-1,则唤醒put操作阻塞的线程
LinkedBlockingQueue就分析到这里了。

相关文章

网友评论

      本文标题:Java并发编程(十) LinkedBlockingQueue

      本文链接:https://www.haomeiwen.com/subject/gpbmfqtx.html