AbstractQueuedSynchronizer(以下简称AQS或AQS锁)是ReentrantLock的底层实现,它提供了自旋、FIFO线程等待队列和阻塞等功能。Java常见并发同步工具如Semaphore、CountDownLatch、ReentrantLock等都是基于AQS实现的。
AQS的实现要点总结如下:
1. 用一个原子int变量代表同步状态
AQS内部有一个原子int变量(命名为state),它是AQS的核心状态,也是唯一跟同步有关的变量。例如,ReentrantLock中state≠0表示锁已被占,state=0表示锁空闲。AQS的子类负责赋予state具体含义,通过覆写tryAcquire(), tryRelease(), tryAcquireShared(), tryReleaseShared(), isHeldExclusively()来通过以乐观锁方式对state进行操作。并且,state只允许通过getState(), setState(), compareAndSetState()对其操作以保证可见性和原子性。
2. 用一个CLH队列存放等待线程,每个线程一个结点,分独占模式和共享模式。
CLH队列(其实是CLH队列的变种)是用于实现自旋锁的队列数据结构,主体是一个链表,如下图所示。
CLH队列
在CLH队列中,每个线程的等待状态(waitState变量)保存在前一个结点中,取值可以是 1 (CANCELED,线程已经放弃等待),-1(SIGNAL,线程已经被阻塞、需要被唤醒),-2(CONDITION,线程在条件队列中等待,这种状态的结点不可能出现在CLH队列,只会出现在Condition条件队列上),-3(PROPAGATE,下一个在共享模式中等待的线程无条件唤醒,即唤醒动作可以继续往队列后面接力),0(就绪,这个状态代表线程没有被阻塞,并且准备好请求锁)。
3. 通过循环+CAS操作来对CLH队列进行修改
因为AQS本身就是用于实现锁的,所以AQS中的CLH队列没有锁来保护,且必须支持并发修改。怎么办?通过循环结合CAS操作来实现CLH队列操作的线程安全性。例如enq(),
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if(compareAndSetTail(t, node)) { t.next = node; return t; } } }}
4. 加锁操作如何实现
与CLH队列操作很类似,加锁操作也是通过循环+CAS操作来实现,不过还使用到了让线程阻塞的方法LockSupport.park()。我们以不可中断的加锁操作为例,讲解其主要实现逻辑如下:在一个死循环中,首先调用乐观锁加锁操作 tryAcquire(),如果成功,则加锁操作直接返回;如果失败了,则判断当前线程结点的前一结点的waitState:如果等于SIGNAL,则直接进入阻塞(说明此时已经有前一个结点得到锁了),如果是CANCELED,则清理一次CLH队列(把已经取消等待的线程中队列中移除)再次执行循环,如果是0或PROPAGATION,则把状态改为SIGNAL,继续执行循环。如果前面的阻塞操作被其他线程唤醒了,再次执行循环。参考代码:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
加锁操作其实有计时版本、可中断版本,但大体逻辑就是上面这样,只不过在循环中再进行了一些时间判断、中断标志判断等,并体现在返回结果或抛出异常上。
总结一下,AQS的加锁操作就是在一个循环中,不断执行CAS加锁(成功则返回,失败则继续),然后不断阻塞和被唤醒(在每次被唤醒的时候,顺便执行一些CLH队列清理工作),再次执行CAS加锁。
5. 加锁操作优化
AQS中的加锁操作,即acquire(),进行了一种barge优化。意思就是,当锁被释放出来以后,此时正好有一个线程请求锁,而队列中的第一个线程也被唤醒并且请求锁,这两个线程谁将获得锁是不确定的。这种优化对AQS锁的并发性有提升,但是使得它变成不公平的锁(没有按照FIFO原则操作)。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
6. 解锁操作如何实现
解锁操作的实现比较简单,先调用一个在子类中实现的CAS解锁操作(tryRelease),如果成功,则把队列中的第一个结点移出队列,判断是否需要唤醒下一个线程,需要则唤醒。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
总结
以上基本把AQS的主要实现逻辑总结完了。AQS的主体逻辑是用循环+CAS操作CLH队列,保证并发访问CLH队列的线程安全性。AQS加锁(支持阻塞)和AQS解锁操作依赖在子类中实现的不支持阻塞的CAS加锁操作(tryAcquire)和CAS解锁操作(tryRelease)实现。加锁操作也是在一个循环中,不断调用tryAcquire实现CAS加锁操作、阻塞唤醒,直到tryAcquire成功或者线程主动取消等待。
网友评论