AQS锁的代表是AbstractQueuedSynchronizer, 为什么要创建一种与内置锁如此相似的新加锁机制呢? 在大多数情况下, 内置锁都能很好的工作, 但在功能上存在一些局限性, 例如, 无法中断一个正在等待获取锁的线程, 或者无法在请求获取一个锁时无限等待下去,
关于AQS锁, 打算从ArrayBlockingQueue的源码进行入手分析
// 内部持有的ReentrantLock同时又持有公平锁与非公平锁
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
}
一、ArrayBlockingQueue为入口
1.1 ArrayBlockingQueue.put生产者
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
// 1.当前线程通过这里先尝试获取锁;
lock.lockInterruptibly();
try {
// 2.获取锁成功, 判断队列元素是否已满, 如果满了;
while (count == items.length)
// 3.通过Condition.await挂起当前线程, 并释放锁;
notFull.await();
// 4.将元素入队;
enqueue(e);
} finally {
// 5.释放锁;
lock.unlock();
}
}
1.2 ArrayBlockingQueue.take消费者
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 1.获取锁;
lock.lockInterruptibly();
try {
while (count == 0)
// 2.队列为空, 获取锁的线程释放锁;
notEmpty.await();
// 3.元素出队操作;
return dequeue();
} finally {
// 4.操作完成后线程释放锁;
lock.unlock();
}
}
针对put和take操作中每次线程的挂起与唤醒, AQS是如何管理这些线程的?
1. put涉及到的方法:
(1) ReentrantLock. lockInterruptibly(); 模块<二>
(2) Condition.await(); 模块<三>
(3) Condition.signal(); 模块<三>
(4) ReentrantLock.unlock(); 模块<二>
2. take涉及到的方法:
(1) ReentrantLock. lockInterruptibly();
(2) Condition.await();
(3) Condition.signal();
(4) ReentrantLock.unlock();
二、ArrayBlockingQueue.put(ReentrantLock.lock与unlock)
1. put涉及到以下几个方法:
(1) ReentrantLock.lockInterruptibly();
(2) Condition.await();
(3) Condition.signal();
(4) ReentrantLock.unlock();
2.1 ReentrantLock.lockInterruptibly获取锁
// NonfairSync: 非公平锁;
// FairSync: 公平锁;
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lockInterruptibly() {
// 获取锁, 所以其实ReentrantLock锁的获取与释放操作其实是交给了Sync
sync.acquireInterruptibly(1);
}
2.1.1 NonfairSync.acquireInterruptibly
public final void acquireInterruptibly(int arg) throws InterruptedException {
// 如果当前线程已经被中断, 抛出异常;
if (Thread.interrupted())
throw new InterruptedException();
// 如果线程处于正常运行状态, 此时首先是尝试获取锁, 与公平锁的区别就在这里;
if (!tryAcquire(arg))
// 如果当前线程获取锁失败, 则会进入到这里被挂起, 直到其他线程显示唤醒;
doAcquireInterruptibly(arg);
}
公平锁与非公共平锁的区别就在这里了, 非公平锁是首先通过tryAcquire尝试获取锁.
2.1.2 NonfairSync.tryAcquire尝试获取锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// c代表的是锁被同一个线程获取的次数, 可以看成是锁重入的问题;
int c = getState();
// 如果当前锁还没有被线程所持有;
if (c == 0) {
// 到这里其实线程并没有持有锁, 所以使用cas, 防止在这个过程中其他线程获取了锁;
if (compareAndSetState(0, acquires)) {
// 将current也就是当前线程赋值给exclusiveOwnerThread, 如果成功, 可以认为
// exclusiveOwnerThread就是锁的持有线程;
setExclusiveOwnerThread(current);
return true;
}
}
// 执行到这里说明情况c != 0, c != 0其实是有两种情况:
// 1. 锁被其他线程持有;
// 2. 锁已经被当前线程持有;
// 所以如果锁是被当前线程持有, 根据锁重入的特点, 直接让c++即可;
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 执行到这里说明当前锁被其他线程持有;
return false;
}
2.1.3 NonfairSync.doAcquireInterruptibly获取锁失败的线程将被挂起
// 这里有几个关键点需要注意一下:
// 1. 线程入队时与Node绑定, 被挂起的线程对应的Node的状态值;
// 2. Head节点的作用;
// 这段代码稍微复杂一些, AQS的一个核心就是在这里, 对于获取锁失败的线程的处理, 所以打算分
// 三段进行分析: addWaiter、addWaiter~ shouldParkAfterFailedAcquire、
// shouldParkAfterFailedAcquire;
private void doAcquireInterruptibly(int arg) throws InterruptedException {
// 1. 这里先将获取锁失败的线程入队;
// 2. 返回的这个node其实就是传进来的node;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
// 获取当前线程对应的node(为了简化, 称为node(thread)) 的前置节点, ;
final Node p = node.predecessor();
// 通过对addWaiter的分析可知, node(thread)会被插入到当前Node队列的尾端, 而此时
// 如果node(thread).pre = node(head), 也就是说当前Node队列中只有node(thread)
// 这一个有效节点, 所以再次考虑获取锁, 如果获取锁成功;
if (p == head && tryAcquire(arg)) {
// 如果获取锁成功, 则将node(thread)置为node(head), 其实到这里可以明白了
// node(head)其实只是占位的作用;
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 1.如果p != head: 当前队列中有多个等待线程, 那么node(thread)进行挂起, 这个也是
// AQS锁的一个特点;
// 2.如果p == head 且 tryAcquire = false也就是说node(head)再次获取锁失败;
// 3. parkAndCheckInterrupt线程被挂起;
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这段代码稍微复杂一些, AQS的一个核心就是在这里, 对于获取锁失败的线程的处理, 所以打算分三段进行分析:
- 1、addWaiter: 线程入队操作.
- 2、addWaiter~ shouldParkAfterFailedAcquire: 线程出队操作.
- 3、shouldParkAfterFailedAcquire: 入队线程被挂起之前的准备工作.
2.1.3.1 NonfairSync.addWaiter获取锁失败的线程进行入队操作
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// pred != null表示当前Node队列已经有值了, 此时要做的就是将当前线程对应的Node插入
// 到当前Node队列的尾端即可;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果当前队列没有值, 此时一定要注意创建队列时head与tail节点;
enq(node);
return node;
}
private Node enq(final Node node) {
// 注意下面创建Node队列的过程, 会先创建一个Node(head)节点, 当前线程对应的Node
// 并没有插入到Node(head)的位置, 而只是插入到Node(head).next位置, 这里先不考虑原因,
// 只记住这个过程就好;
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
线程进入Node队列的结构如下所示:

2.1.3.2 Node出队操作
for (;;) {
// 获取当前节点的前置节点;
final Node p = node.predecessor();
// 1.这里需要考虑的一个问题是为何当前节点的前置节点是Node(Head)时才会再次尝试获取锁?
// 2.其实结合上面的图大致可以看出Node(Head仅仅是一个占位节点), 如果当前节点的前置节点
// 是Node(Head)也是可以看出当前节点前面已经没有等待线程了, 所以当前线程再次尝试获取锁,
// 这个算是线程被挂起之前的最后一次挣扎了.
if (p == head && tryAcquire(arg)) {
// 如果当前线程成功的获取了锁, 此时将当前线程至于Node队列的Head位置, 注意此时成功获取
// 锁的线程被放置在了Head位置
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
}
稍微总结一下, 获取锁失败的线程首先会进入到Node队列, 被添加在Node队列的tail位置, 然后在被挂起之前, 如果当前线程对应的Node位于Head.next位置, 则会再次尝试获取一次锁, 如果仍然失败, 那么此时就会尝试被挂起.
到这里, 其实发现是有自旋锁的影子的, 获取锁失败的线程再次尝试获取锁, 失败之后才会被挂起
2.1.3.3 NonfairSync.shouldParkAfterFailedAcquire线程挂起前的操作
// 1. 需要注意的是每个Node的waitStatus默认值都是0;
// 2.每一个node.waitStatus被设置为Node.SIGNAL其实是根据他的后置节点来触发的.
// 3.一定要注意该方法是在for(;;)中被调用, 所以其实最终会触发return true的执行.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// ws默认值为0;
int ws = pred.waitStatus;
// 1.首次进入这里直接跳过该if语句向下执行.
// 2.到目前为止还是没有发现哪里有对Node node.waitStatus进行赋值的地方.
if (ws == Node.SIGNAL)
return true;
// 在什么情况下ws才会>0????
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 这里是将Node pred.waitStatus设置为Node.SIGNAL.
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
对这个方法总结一下:
- 1、获取锁失败的线程在进入Node队列时, waitStatus = 0.
- 2、进入队列之后, 前置节点为Head的节点再次尝试获取锁, 如果此时获取锁失败, 则将其前置节点的waitStatus置为Node.SIGNAL.
- 3、设置完成以后, 才会进入真正的挂起状态.

2.1.4 NonfairSync.parkAndCheckInterrupt线程被挂起
private final boolean parkAndCheckInterrupt() {
// 线程被挂起;
LockSupport.park(this);
return Thread.interrupted();
}
2.2 ReentrantLock.unlock唤醒ReentrantLock.lock失败被挂起的线程
ReentrantLock.lock()被挂起的线程进入沉睡状态以后, 需要ReentrantLock.unlock()进行唤醒.
public void unlock() {
// 释放锁;
sync.release(1);
}
2.2.1 NonFairSync.release
public final boolean release(int arg) {
// 这里需要考虑锁重入的问题, 锁可以被同一个线程持有多次, 所以tryRelease会触发
// c--操作, 如果c == 0, 表示锁完成被释放, 此时才会考虑唤醒Node队列中等待线程;
if (tryRelease(arg)) {
Node h = head;
// 在前面的分析中也已经知道, 线程被挂起之前waitStatus会被设置为Node.SIGNAL = -1;
if (h != null && h.waitStatus != 0)
// 唤醒等待的线程, 这里传入的是head节点;
unparkSuccessor(h);
return true;
}
return false;
}
2.2.2 NonFairSync.unparkSuccessor
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//1.唤醒线程时, 其实并没有改变head节点的waitStatus值.
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//unlock唤醒的其实是Head.next节点, 所以也对应前面lock中所说的, Head节点其实是一个占位节点;
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 仅仅只是唤醒线程, 并没有改变线程对应节点的waitStatus值.
LockSupport.unpark(s.thread);
}
总结:
- 1、unlock唤醒线程是从Head节点的后置节点开始进行唤醒, 因此head节点仅仅是占位的作用.
- 2、被唤醒的线程的前置节点如果刚好是head节点, 则会替换掉head节点.
- 3、被唤醒的线程的waitStatus值此时仍然为Node.SIGNAL, 这个值直到该节点的后置节点被唤醒时才会被修改为0.
三、ArrayBlockingQueue.put(Condition.await与signal)
分析之前需要考虑以下几个问题:
- 1、await之后线程被挂起, 如何唤醒lock时被挂起的其他线程?
- 2、await被挂起的线程是如何被管理的?
先列出总结:
- 1、线程触发Condition.await时, 首先会进入到Node(firstWaiter)队列中.
- 2、然后当前线程释放锁, 如果能够完全释放锁, 则唤醒ReentrantLock.lock时获取锁失败的线程.
- 3、唤醒Node(Head)队列中的线程之后, 当前线程进入挂起状态.
- 4、获取ReentrantLock.lock成功的线程通过Condition.signal又会唤醒Node(firstWaiter)挂起的线程, 唤醒的方式是修改Node.waitStatus = 0, 同时将该Node从Node(firstWaiter)中移除, 然后添加到Node(Head)的尾端.
3.1 Condition.await线程被挂起
public final void await() throws InterruptedException {
// 线程在被挂起时, 也会进入到一个Node队列中, 这个队列与通过lock方式的队列是两个队列.
Node node = addConditionWaiter();
// 执行锁的释放操作, 同时唤醒lock失败被挂起的线程.
long savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 线程被挂起时waitStatus为Node.CONDITION.
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 线程被唤醒之后, 执行到这里尝试获取锁.
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
3.1.1 Condition.addConditionWaiter线程进入Node(firstWaiter)队列
private Node addConditionWaiter() {
Node t = lastWaiter;
Node node = new Node(Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
这段代码执行完成以后, Node队列如下:

3.1.2 Condition.fullyRelease
final int fullyRelease(Node node) {
try {
int savedState = getState();
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
node.waitStatus = Node.CANCELLED;
throw t;
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
释放锁, 如果锁被完全释放, 唤醒Node(Head)队列.
3.1.3 NonfairSync.isOnSyncQueue
final boolean isOnSyncQueue(Node node) {
// 默认情况下node.waitStatus = Node.CONDITION, 然后进入if内部被挂起.
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
3.2 Condition.signal唤醒Condition.await挂起的线程
public final void signal() {
Node first = firstWaiter;
if (first != null)
// 与unlock有点儿类似, 拿到firstWaiter节点进行唤醒.
doSignal(first);
}
3.2.1 Condition.doSignal
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//在执行while中的逻辑之前会先将Node从Node(firstWaiter)队列中移除.
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
3.2.2 NonFairSync.transferForSignal
final boolean transferForSignal(Node node) {
// 首先将waitStatus更改为Node.CONDITION.
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
// 将该Node添加到Node(Head)尾端, 而且此时Node.waitStatus被修改为了0.
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
网友评论