package java.util.concurrent;
//无界阻塞优先级队列,每次出队都返回优先级最高的元素
//是二叉树最小堆的实现
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable
1、常用方法
构造方法
//默认队列容量
private static final int DEFAULT_INITIAL_CAPACITY = 11;
public PriorityBlockingQueue() {
this(DEFAULT_INITIAL_CAPACITY, null);
}
public PriorityBlockingQueue(int initialCapacity) {
this(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
//lock独占锁对象用来控制同时只能有一个线程可以进行入队出队操作
this.lock = new ReentrantLock();
//notEmpty条件变量用来实现take方法阻塞模式
//put操作是非阻塞的(无界队 )
this.notEmpty = lock.newCondition();
//比较器comparator用来比较元素大小
this.comparator = comparator;
//数组queue用来存放队列元素
this.queue = new Object[initialCapacity];
}
入队方法
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e); // never need to block
}
/**
* 增加一个元索
*/
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
//如果size大小已经达到队列的大小 则 扩容
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
/**
* 如果size大小已经达到队列的大小 则 扩容
*/
private void tryGrow(Object[] array, int oldCap) {
//出于性能考虑,队列此时的出队操作还可以获取锁
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
//allocationSpinLock用cas控制只有一个线程可以进行扩容
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
//oldGap<64则扩容新增oldcap+2,否者扩容50%
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
//其它扩容线程会从执行状态转为就绪状态,让出cpu
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
//这里是在获取锁后操作的目的是 确保获取的数组则是最新的(可能有元素出队)
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
出队方法
/**
* 移除并返问队列头部的元素,如果队列为空,则返回null
*/
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return dequeue();
} finally {
lock.unlock();
}
}
/**
* 移除并返回队列头部的元素,如果队列为空,则阻塞
*/
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
//如果队列为空,则阻塞
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
/**
* 出队
*/
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
//队头元素
E result = (E) array[0];
//对尾元素
E x = (E) array[n];
array[n] = null;
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftDownComparable(0, x, array, n);
else
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
其它
/**
* 返回队列头部的元素,如果队列为空,则返回null
*/
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (size == 0) ? null : (E) queue[0];
} finally {
lock.unlock();
}
}
/**
* 获取队列元素个数
*/
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
网友评论