java.util.concurrent
//由单向链表结构组成的有界阻塞队列
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
//内部维护的链表节点结构Node
static class Node<E> {
E item;//入队元素
Node<E> next;//后继节点
Node(E x) {
item = x;
}
}
//入队锁
private final ReentrantLock takeLock = new ReentrantLock();
//出队锁
private final ReentrantLock putLock = new ReentrantLock();
1、常用方法
构造方法
/**
* 在不指定时容量时为Integer.MAX_VALUE
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
父类AbstractQueue实现
/**
* 增加一个元索,如果队列已满,则抛出异常
*/
public boolean add(E e) {
if (offer(e))
return true;
else
throw new IllegalStateException("Queue full");
}
/**
* 移除并返回队列头部的元素,如果队列为空,则抛出异常
*/
public E remove() {
E x = poll();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
/**
* 返回队列头部的元素,如果队列为空,则抛出异常
*/
public E element() {
E x = peek();
if (x != null)
return x;
else
throw new NoSuchElementException();
}
非阻塞方法
/**
* 添加一个元素并返回true,如果队列已满,则返回false
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
//队列数据总数自增+1后返回,返回的是未+1的值
c = count.getAndIncrement();
if (c + 1 < capacity)
//唤醒非满等待队列上的线程
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
//队列中刚好有一个数据,唤醒非空等待队列
signalNotEmpty();
return c >= 0;
}
/**
* 移除并返问队列头部的元素,如果队列为空,则返回null
* */
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)//出队后队列还是非空
//唤醒非空等待队列上的线程
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
//唤醒非满等待队列上的线程
signalNotFull();
return x;
}
/**
* 返回队列头部的元素,如果队列为空,则返回null
*/
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
//返回head的next的item,因为head和tail的item是Null
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}
阻塞方法
/**
* 添加一个元素,如果队列满,则阻塞
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();//可被线程中断
try {
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)//队列未满
notFull.signal();//唤醒非满等待队列上的线程
} finally {
putLock.unlock();
}
if (c == 0)
//队列上只有一个元素,唤醒非空等待队列上的线程
signalNotEmpty();
}
/**
* 移除并返回队列头部的元素,如果队列为空,则阻塞
*/
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();//可被线程中断
try {
while (count.get() == 0) {
notEmpty.await();//休眠非空等待队列上的线程
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();//唤醒非空等待队列上的线程
} finally {
takeLock.unlock();
}
if (c == capacity)
//唤醒非满等待队列上的线程
signalNotFull();
return x;
}
其它
/**
* 没有遍历整个队列
*/
public int size() {
return count.get();
}
2、入队出队操作
当队列添加元素后:
*会调用notFull.signal(); 唤醒非满等待队列上的线程;
*如果队列之前刚好是空的,会调用signalNotEmpty(); 唤醒非空等待队列上的线程。
当队列删除元素后:
*如果出队后队列还是非空,会调用notEmpty.signal(); 唤醒非空等待队列上的线程;
*如果队列之前刚好是满的,会调用signalNotFull(); 唤醒非满等待队列上的线程 。
网友评论