前言
分析LinkedBlockingQueue的实现原理前,需要先了解ReentrantLock 和AtomicInteger 。
参考:
基于CAS操作的Java非阻塞同步机制
从源码分析ReentrantLock原理
初识LinkedBlockingQueue
public class LinkedBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
static class Node<E> {
E item;
Node<E> next;
Node(E x) { item = x; }
}
private final int capacity;
private final AtomicInteger count = new AtomicInteger();
transient Node<E> head;
private transient Node<E> last;
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();
}
public interface BlockingQueue<E> extends Queue<E> {
boolean add(E e);
boolean offer(E e);
void put(E e) throws InterruptedException;
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
E take() throws InterruptedException;
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
int remainingCapacity();
boolean remove(Object o);
public boolean contains(Object o);
int drainTo(Collection<? super E> c);
int drainTo(Collection<? super E> c, int maxElements);
}
可以看到LinkedBlockingQueue内部有两个可重入独占锁takeLock 和putLock ,从字面就可以判定这两个锁是控制插入和取出操作同步的。两个Condition变量分别是notEmpty和notFull,猜测一个是当队列为空时让取出操作阻塞,一个是当队列已满时让插入操作阻塞,就像一个支持多线程插入与取出的生产者消费者模型。
LinkedBlockingQueue内部还有一个Node类以及Node类型的head和last变量,可见它是以单向链表实现的。
AtomicInteger类型的cout用来对队列的Node数量同步计数。
阻塞插入操作 put
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
这里用了putLock来加锁,也就是插入操作是同步执行的,当队列已满通过notFull条件来挂起线程。如果未满就执行enqueue (node)插入队列尾部,然后将队列长度同步+1:
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
如果插入队列尾部后队列的长度依然小于上限就执行notFull的signal方法唤醒某个等待插入队列的线程。该线程是队列满了以后阻塞在notFull条件上的。
如果c==0发生,表示可能存在取出操作的线程阻塞在notEmpty的条件上,通过signalNotEmpty去唤醒一个挂起的线程:
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
阻塞取出 take
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
先通过takeLock加锁,表示该操作是同步执行的,如果当前队列长度为0则通过notEmpty条件挂起线程。
如果当前队列长度不为0,取出头部的节点,然后将队列长度同步-1:
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
如果队列不为空,通过notEmpty条件的signal方法唤醒某个等待取出节点的线程让它开始工作。该线程是队列为空时阻塞在notEmpty上的。
最后判断队列的长度刚好满足上限-1,表示可能有挂起的等待插入的线程,通过signalNotFull方法将其中一个唤醒:
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
阻塞插入取出小结
可以发现插入与插入操作之间是同步的,取出与取出操作之间也是同步的,而插入与取出是并发的,也就是在一个线程在执行插入到尾部时,另一个线程可能正在取头部。这样能提供LinkedBlockingQueue的吞吐量。
使用signal方法而不使用signalAll方法可以提高性能,LinkedBlockingQueue同时最多只能有一个线程执行插入操作,一个线程执行取操作,并且插入的节点数只能一个,所以即使竞争不充分也不会死锁。
非阻塞插入 offer
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
从代码看该方法首先判断了当前队列是否已满,若满了就直接返回false,而不是在同步快中挂起等待notFull条件,同样他也会在执行完同步插入后尝试将等待notFull条件的线程唤醒,并且尝试唤醒等待notEmpty条件的线程。
非阻塞取出 poll
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
该方法也是不阻塞的,当队列为空直接返回null。如果能取出节点,在取出节点后也会尝试唤醒等待notEmoty条件的线程,最后也会尝试唤醒等待notFull条件的线程。
阻塞不超时插入
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
同阻塞插入相比其实就是在notFull执行await方法时加入了超时时间,通过awaitNanos实现,这样超过timeout队列还是满的就返回false。
阻塞不超时取出
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
同阻塞取出相比,在notEmpty条件执行awaitNanos方法代替await方法,当超过timeout队列还是空的就返回false。
总结
LinkedBlockingQueue完全是利用ReentrantLock的lockInterruptibly、unlock、newCondition以及Condition的await和signal等接口实现的,因此要想对它有更好的理解,需要先理解ReentrantLock的内部机制。
网友评论