一 PriorityBlockingQueue
- 与PriorityQueue相比就是访问时多了锁的并发控制
1.1 扩容
private void tryGrow(Object[] array, int oldCap) {
lock.unlock(); // 先释放锁
Object[] newArray = null;
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {//扩容时allocationSpinLock加锁
try {//小于64时翻倍,否则增减50%
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
allocationSpinLock = 0;
}
}
if (newArray == null) // 其他线程在执行扩容
Thread.yield();
lock.lock();//加锁后包括原数据到新存储数组中
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
1.2 入队
- 与LinkedBlockingQueue和ArrayBlockingQueue不同,可扩容,所以不会用空间不足需要等待的情况
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
//尝试扩容,其他线程在扩容则想让出cpu,等会再检查容量是否足够
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)//使用原生比较函数比较
siftUpComparable(n, e, array);
else//使用配置的比较函数比较大小
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
private static <T> void siftUpComparable(int k, T x, Object[] array) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {//从索引k开始,逐层向上比较大小,
int parent = (k - 1) >>> 1;
Object e = array[parent];
if (key.compareTo((T) e) >= 0)
break;
array[k] = e;
k = parent;
}
array[k] = key;
}
1.3 出队
- 存在队列为空时,需要等待的情况。入队不需要等待。所以只有一个条件对象notEmpty
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
private E dequeue() {
int n = size - 1;
if (n < 0)
return null;
else {
Object[] array = queue;
E result = (E) array[0];//出队索引0节点
E x = (E) array[n];//末尾索引节点放到索引0
array[n] = null;
Comparator<? super E> cmp = comparator;
//调整最小堆,使索引0节点到正确位置
if (cmp == null)//使用默认比较函数
siftDownComparable(0, x, array, n);
else//使用用户自定义比较函数
siftDownUsingComparator(0, x, array, n, cmp);
size = n;
return result;
}
}
private static <T> void siftDownComparable(int k, T x, Object[] array,
int n) {
if (n > 0) {
Comparable<? super T> key = (Comparable<? super T>)x;
int half = n >>> 1; // loop while a non-leaf
while (k < half) {//从索引k开始逐层向下和孩子节点比较,
int child = (k << 1) + 1; // assume left child is least
Object c = array[child];
int right = child + 1;
if (right < n &&
((Comparable<? super T>) c).compareTo((T) array[right]) > 0)//左右孩子比较,选出较小的
c = array[child = right];
if (key.compareTo((T) c) <= 0)//较小孩子和目标节点值比较
break;//比孩子小,则赋值给当前节点,
array[k] = c;//否则较小孩子赋值当前节点,继续向下比较
k = child;
}
array[k] = key;
}
}
二 DelayQueue
-
private final PriorityQueue<E> q = new PriorityQueue<E>();
优先级队列存储数据
2.1 入队
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
q.offer(e);//入队
if (q.peek() == e) {//本次入队的是优先级最高的
leader = null;//新入队可能优先级更高,设置lead为null
available.signal();//通知等待线程重新检查获取有数据可用
}
return true;
} finally {
lock.unlock();
}
}
2.2 出队
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();//获取优先级最高元素
if (first == null || first.getDelay(NANOSECONDS) > 0)//空队列,或元素的延迟时间大于0,则不返回
return null;
else
return q.poll();//放回优先级最高元素
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)//无数据,则休眠等待
available.await();
else {//有数据,检查延迟时间
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)//时间到期,则数据出队返回
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)//leader不为null,有线程在等待该节点则休眠等待
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {//休眠优先级最高节点的延迟时间,配合leader为优先级最高节点被等待的线程。
available.awaitNanos(delay);
} finally {//休眠完成,进入循环,节点出队,返回数据
if (leader == thisThread)
leader = null;//设置lead为null
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();//还有数据通知其他线程唤醒处理
lock.unlock();
}
}
三 差别
区别 |
PriorityBlockingQueue |
DelayQueue |
存储 |
数组存储,内部实现最小堆功能 |
PriorityQueue存储,使用类接口 |
扩容 |
支持 |
支持 |
并发锁 |
支持 |
支持 |
优先级 |
队列中位置 |
队列中位置,同时表示节点数据需要等待多久之后才可被访问 |
条件对象 |
取数据,队列为空是等待 |
取数据队列为空时等待,有数据但延迟时间未到时超时等待。 |
网友评论