基本使用
@Test
public void testPriorityQueue() throws InterruptedException {
PriorityQueue priorityQueue = new PriorityQueue(Lists.newArrayList(5, 4, 2, 1, 3));
System.out.println(priorityQueue);
System.out.println(priorityQueue.poll());
System.out.println(priorityQueue.poll());
PriorityBlockingQueue<Integer> blockingQueue = new PriorityBlockingQueue<>();
blockingQueue.add(5);
System.out.println(blockingQueue.poll());
System.out.println(blockingQueue.take());
}
输出
[1, 3, 2, 4, 5]
1
2
5
(阻塞)
PriorityQueue
成员变量
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
transient Object[] queue; // non-private to simplify nested class access
/**
* The number of elements in the priority queue.
*/
private int size = 0;
/**
* The comparator, or null if priority queue uses elements'
* natural ordering.
*/
private final Comparator<? super E> comparator;
/**
* The number of times this priority queue has been
* <i>structurally modified</i>. See AbstractList for gory details.
*/
transient int modCount = 0; // non-private to simplify nested class access
通过数组实现一个堆,元素在queue数组中并不是完全有序的,仅堆顶元素最大或最小。
基本方法
public E poll() {
if (size == 0)
return null;
int s = --size;
modCount++;
E result = (E) queue[0];
E x = (E) queue[s];
queue[s] = null;
if (s != 0)
siftDown(0, x);
return result;
}
/**
* Inserts item x at position k, maintaining heap invariant by
* demoting x down the tree repeatedly until it is less than or
* equal to its children or is a leaf.
*
* @param k the position to fill
* @param x the item to insert
*/
private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}
@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
int half = size >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = queue[child];
int right = child + 1;
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right];
if (key.compareTo((E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = key;
}
以poll方法为例,实际上是获取堆顶元素,然后调整堆。
调整堆的方法(以大顶堆为例):
- 判断是否传入comparator,有则按照comparator排序,否则按照自然顺序排序
- 取节点左右孩子节点最大值,与父亲节点交换
扩容方法
/**
* Increases the capacity of the array.
*
* @param minCapacity the desired minimum capacity
*/
private void grow(int minCapacity) {
int oldCapacity = queue.length;
// Double size if small; else grow by 50%
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) :
(oldCapacity >> 1));
// overflow-conscious code
if (newCapacity - MAX_ARRAY_SIZE > 0)
newCapacity = hugeCapacity(minCapacity);
queue = Arrays.copyOf(queue, newCapacity);
}
private static int hugeCapacity(int minCapacity) {
if (minCapacity < 0) // overflow
throw new OutOfMemoryError();
return (minCapacity > MAX_ARRAY_SIZE) ?
Integer.MAX_VALUE :
MAX_ARRAY_SIZE;
}
- 小容量扩容1倍
- 大容量扩容0.5倍
- 快溢出时调整为Integer.MAX_VALUE - 8 或 Integer.MAX_VALUE
是否线程安全
非线程安全
PriorityBlockingQueue
其实现基本与PriorityQueue一致,不过PriorityBlockingQueue是线程安全的,并且实现了BlockingQueue接口,在队列为空时take会阻塞。
/**
* Priority queue represented as a balanced binary heap: the two
* children of queue[n] are queue[2*n+1] and queue[2*(n+1)]. The
* priority queue is ordered by comparator, or by the elements'
* natural ordering, if comparator is null: For each node n in the
* heap and each descendant d of n, n <= d. The element with the
* lowest value is in queue[0], assuming the queue is nonempty.
*/
private transient Object[] queue;
/**
* The number of elements in the priority queue.
*/
private transient int size;
/**
* The comparator, or null if priority queue uses elements'
* natural ordering.
*/
private transient Comparator<? super E> comparator;
/**
* Lock used for all public operations
*/
private final ReentrantLock lock;
/**
* Condition for blocking when empty
*/
private final Condition notEmpty;
/**
* Spinlock for allocation, acquired via CAS.
*/
private transient volatile int allocationSpinLock;
/**
* A plain PriorityQueue used only for serialization,
* to maintain compatibility with previous versions
* of this class. Non-null only during serialization/deserialization.
*/
private PriorityQueue<E> q;
和PriorityQueue的区别:增加了
- 重入锁ReentrantLock
- Condition,用于队列空情况下的阻塞
- allocationSpinLock,通过CAS手段对queue扩容
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // must release and then re-acquire main lock
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // back off if another thread is allocating
Thread.yield();
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
可以看到与PriorityQueue的扩容函数很像,不同点:
- 调用函数时必须持有锁
- 使用CAS方法进行扩容,在allocationSpinLock为0,并且CAS将其置为1时,线程才能够对数组进行扩容。如果多个线程并发扩容,其余线程会调用Thread.yield()方法。
为什么这样实现PriorityBlockingQueue扩容?
因为PriorityBlockingQueue内部使用的ReentrantLock重入锁,同一个线程多次调用add函数,可能恰好同时调用了tryGrow函数。此时通过重入锁是无法加锁的,仅能通过Synchronized或CAS方式控制并发。
allocationSpinLock是transient的,因为序列化时并不需要此参数;同时又是volatile的,因为可能有多个线程同时调用。
private transient volatile int allocationSpinLock;
UNSAFE.compareAndSwapInt
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long allocationSpinLockOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = PriorityBlockingQueue.class;
allocationSpinLockOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("allocationSpinLock"));
} catch (Exception e) {
throw new Error(e);
}
}
调用方法
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset, 0, 1)
allocationSpinLockOffset是allocationSpinLock变量在PriorityBlockingQueue类中的偏移量。
那么使用allocationSpinLockOffset有什么好处呢?它和直接修改allocationSpinLock变量有什么区别?
获取该字段在类中的内存偏移量,直接将内存中的值改为新值。直接修改allocationSpinLock并不是CAS。JDK 1.8代码如下:
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
在AtomicInteger类中的调用如下,getAndAddInt方法由具体类的实现方法,抽取到了UNSAFE类中:
public final int getAndDecrement() {
return unsafe.getAndAddInt(this, valueOffset, -1);
}
对比 PriorityQueue 和 PriorityBlockingQueue
- PriorityQueue是非线程安全的,PriorityBlockingQueue是线程安全的
- PriorityBlockingQueue使用重入锁,每一个操作都需要加锁
- PriorityBlockingQueue扩容时使用了CAS操作
- 两者都使用了堆,算法原理相同
- PriorityBlockingQueue可以在queue为空时阻塞take操作
JDK实现堆的方法
/**
* Establishes the heap invariant (described above) in the entire tree,
* assuming nothing about the order of the elements prior to the call.
*/
@SuppressWarnings("unchecked")
private void heapify() {
for (int i = (size >>> 1) - 1; i >= 0; i--)
siftDown(i, (E) queue[i]);
}
private void siftDown(int k, E x) {
if (comparator != null)
siftDownUsingComparator(k, x);
else
siftDownComparable(k, x);
}
@SuppressWarnings("unchecked")
private void siftDownComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>)x;
int half = size >>> 1; // loop while a non-leaf
while (k < half) {
int child = (k << 1) + 1; // assume left child is least
Object c = queue[child];
int right = child + 1;
if (right < size &&
((Comparable<? super E>) c).compareTo((E) queue[right]) > 0)
c = queue[child = right];
if (key.compareTo((E) c) <= 0)
break;
queue[k] = c;
k = child;
}
queue[k] = key;
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
modCount++;
int i = size;
if (i >= queue.length)
grow(i + 1);
size = i + 1;
if (i == 0)
queue[0] = e;
else
siftUp(i, e);
return true;
}
private void siftUp(int k, E x) {
if (comparator != null)
siftUpUsingComparator(k, x);
else
siftUpComparable(k, x);
}
@SuppressWarnings("unchecked")
private void siftUpComparable(int k, E x) {
Comparable<? super E> key = (Comparable<? super E>) x;
while (k > 0) {
int parent = (k - 1) >>> 1;
Object e = queue[parent];
if (key.compareTo((E) e) >= 0)
break;
queue[k] = e;
k = parent;
}
queue[k] = key;
}
网友评论