LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按 FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素
LinkedBlockingQueue采用了双锁读写分离的技术,可以让读写操作在不干扰对方的情况下,完成各自的功能,提高并发吞吐量。
结构:
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;
// 链表的节点
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
// 队列的容量,默认是Integer.MAX_VALUE
private final int capacity;
// 队列元素的计数
private final AtomicInteger count = new AtomicInteger();
// 头节点,头节点的元素始终等于null
transient Node<E> head;
// 尾节点,尾节点的next始终等于null
private transient Node<E> last;
// 调用take, poll等方法的出队锁(读锁)
private final ReentrantLock takeLock = new ReentrantLock();
// 读锁的条件
private final Condition notEmpty = takeLock.newCondition();
// 调用put, offer等方法的入队锁(写锁)
private final ReentrantLock putLock = new ReentrantLock();
// 写锁的条件
private final Condition notFull = putLock.newCondition();
}
构造函数:
// 无参构造函数,指定最大容量
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
// 指定容量的构造函数
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
// 指定集合的构造函数
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
入队API:
put,将指定的元素插入到队尾,如果队列满了,则等待队列空间释放
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
// 拿入队锁,加一个可中断的锁
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 如果队列满了,则等待释放空间
while (count.get() == capacity) {
notFull.await();
}
// 新节点入队
enqueue(node);
// CAS自增计数
c = count.getAndIncrement();
// 如果队列还没满,则通知入队线程入队
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果c=0说明队列原来是空的(getAndIncrement会返回count原来的值),现在有元素了,所以要通知出队线程
if (c == 0)
signalNotEmpty();
}
// 新节点入队
private void enqueue(Node<E> node) {
// 把新节点加入到队列最后
last = last.next = node;
}
// 将出队线程从等待条件队列移入等待锁的同步队列,让出队线程参与锁竞争,拿到锁之后执行出队动作
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
offer,将指定的元素插入到队尾,如果队列满了,直接返回false,不会等待
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 队列满了,直接返回false
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 队列没满,入队
if (count.get() < capacity) {
// 添加到队尾
enqueue(node);
c = count.getAndIncrement();
// 还有空间,通知入队线程入队
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
// 如果c=0说明队列原来是空的(getAndIncrement会返回count原来的值),现在有元素了,所以要通知出队线程
signalNotEmpty();
return c >= 0;
}
出队API:
take,出队并删除头节点的下一个节点,如果队列空了则等待。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列空了,等待
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
c = count.getAndDecrement();
// 在出队之前的队列长度>1,说明,这次出队完还有元素
if (c > 1)
// 通知出队线程
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 在出队之前的队列是满的,现在出队了,不满了,通知入队元素,继续入队
if (c == capacity)
signalNotFull();
return x;
}
// 出队元素,把原头节点删除换成新头节点(原头节点的下一个元素)
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
poll,出队并删除头节点的下一个节点,如果队列空了直接返回null
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
peek ,获取头节点的下一个节点,如果队列空了直接返回null
public E peek() {
// 如果队列是空则返回null
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 获取头节点的下一个节点,返回节点的元素
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
网友评论