AbstractQueuedSynchronizer类(简称AQS)依赖于内部的FIFO队列,提供了一个可以实现阻塞锁和同步机制的框架。依赖于AQS实现的具有同步机制和锁功能的类,需要实现它定义的protect方法(tryAcquire,tryRelease等方法),通过改变state字段的状态来保证同步机制。
AQS的使用场景
1、ReentrantLock类提供的同步锁的功能也是基于AQS类来实现的,ReentrantLock提供的公平锁(sync)和非公平锁(NofairSync)两种锁机制,都是通过重写tryAcquire和tryRelease方法来实现的。
ReentrantLock重写的tryAcquire方法:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// 非公平的获取锁方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 提供了锁的可重入功能,当已经获取锁的线程再次请求获取锁时
// 将state的值加一。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
ReentrantLock重写的tryRelease方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 判断当前请求释放锁的线程与当前持有锁的线程是不是同一个线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 因为提供了锁重入的功能,这里会判断当前线程是否已经释放完所有持有的锁
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
Reentrant基于AQS加锁的执行过程

2、CyclicBarrier类
应用场景:创建一组任务,它们并行地执行工作,然后在进行下一个步骤之前等待,直至所有任务都完成,它使得所有并行任务都将在栅栏处列队,因此可以一致的向前移动。
CyclicBarrier类提供的功能也是基于AQS类来实现的,主要是利用AQS类中ConditionObject类提供的功能。
// 唤醒所有等待的线程
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
// 唤醒所有线程的方法,循环队列中等待的线程,调用transferForSignal方法唤醒在等待中的线程。
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
transferForSignal方法,会对当前节点状态校验,然后调用LockSupport.unpark方法唤醒指定的线程。
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
await方法:当CyclicBarrier类提供的计数器没有减到等于0时,提前进入的线程会调用AQS中ConditionObject类提供的await方法,将当前线程加入到等待队列中。
public final void await() throws InterruptedException {
// 如果当前线程被中断,会抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 将新节点添加到等待队列中去,节点的状态为 CONDITON。
Node node = addConditionWaiter();
// 释放当前线程持有的锁,如果失败则将当前节点的状态改为cancelled(表示当前节点失效)。
int savedState = fullyRelease(node);
int interruptMode = 0;
// 判断当前节点是否在CLH队列中,CONDITION状态的节点,只能在CONDITION队列中。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 出了while循环,说明线程被唤醒,并且已经将该node节点从CONDITION队列transfer到了CLH队列中,或者发生了中断。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 处理异常或者中断的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
下面来看一下CyclicBarrier类的执行过程

网友评论