LinkedBlockingQueue是BlockingQueue的链表实现,他的阻塞体现在put
和take
方法上,下面将通过源码介绍如何LinkedBlockingQueue是如何实现的阻塞队列。
1. ReentrantLock+Condition
通过AQS
构建的ReentrantLock
与Condition
实现了put
和take
的锁与线程挂起/唤醒模型
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
2. put阻塞
1)首先加put
锁
2)当容量满时(count.get() == capacity
),执行notFull.await();
挂起当前线程并释放锁,相当于Object.wait
。
3)当容量从0变成1时(if c == 0
),执行signalNotEmpty();
唤醒一个take
挂起线程,相当于Object.notify
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly(); // put锁
try {
while (count.get() == capacity) {
notFull.await(); // 容量满时,挂起线程释放锁,并加入等待队列
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty(); // 当容量从0变成1时,唤醒一个take挂起的线程
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();// 唤醒一个take挂起的线程
} finally {
takeLock.unlock();
}
}
3. take阻塞
1)首先加take
锁
2)当容量为0时(count.get() == 0
),执行notEmpty.await();
挂起当前线程并释放锁,相当于Object.wait
。
3)当容量从满容量减少1时(if c == capacity
),执行signalNotFull();
唤醒一个put
挂起线程,相当于Object.notify
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly(); // take锁
try {
while (count.get() == 0) {
notEmpty.await(); // 当队列中没有对象时,挂起线程释放锁,并加入等待队列
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull(); // 当容量从满容量减少1时,唤醒一个put挂起的线程
return x;
}
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal(); // 唤醒一个put挂起的线程
} finally {
putLock.unlock();
}
}
4. 拓展: offer(timeout) 与 poll(timeout)实现原理
实现与put/take
基本一样,只不过底层调用了LockSupport.parkNanos/LockSupport.unparkNanos
- wait
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 基础park
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
- wait timeout
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout); // timeout park
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
网友评论