前言
阅读AQS源码的过程很是艰辛,在陆陆续续读了三四次后,大概搞懂了,特此分享一些阅读时的技巧/特点,希望能帮助大家更轻松的读懂AQS.
设计要点
1.1 Node
Node是AQS的内部数据结构,双向链表,一个Node表示一个线程.当一个线程"设置状态"失败,会被构造成Node添加到阻塞队列尾部. 当一个"设置状态"成功的线程执行完成,"释放状态"时,会唤醒阻塞队列首部之后的Node对应的线程尝试"设置状态"
static final class Node {
//节点状态
volatile int waitStatus;
//前一个节点
volatile Node prev;
//下一个节点
volatile Node next;
//对应的线程
volatile Thread thread;
//Condition使用
Node nextWaiter;
...
}
1.2 state
"设置状态","释放状态"是AQS的一个抽象概念, 根据不同的用途,描述也不同,在锁上描述就是就是获取锁,释放锁. AQS提供了操作state的方法,由子类根据需要定义state的含义, 例如对于独占锁 0表示锁未被获取,1表示锁已被获取.
/**
* The synchronization state.
*/
private volatile int state;
1.3 LockSupport
由于Thread.sleep和Thread.resume存在死锁问题,AQS中阻塞和唤醒线程使用LockSupport的park和unpark实现.它基于一套"契约"机制. 契约有且只能有一个,unpark给与一个契约,park消耗一个契约.如果先unpark再park,线程不会阻塞
LockSupport.unpark(xxx);
//LockSupport.unpark(xxx);... 无论调用多少次也只会有一个契约
LockSupport.park(xxx);
1.4 Condition
AQS中仿照监视器的等待通知模式,使用Condtion接口提供了类似但更强大的功能
-
超时等待
-
每个Condition都对应一个等待队列,因此可以有多个等待队列
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. /
private transient Node firstWaiter;
/* Last node of condition queue. */
private transient Node lastWaiter;
...
}
1.5 阻塞队列和等待队列
阻塞队列指获取锁失败被阻塞等待唤醒的线程,等待队列指调用Condition.wait()放弃锁等待被Condition.signal()唤醒的线程,两者都使用Node实现.
两者还有一点关联:当等待队列中的节点被唤醒后,会移动到阻塞队列中,等待获取锁.
阅读技巧
2.1 &&和||操作符的短路应用
AQS中有大量&&和||的使用,充分利用了它们的短路特性来避免嵌套if/else.代码看起来很精巧,相应的阅读难度也提高了.以acquire为例
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//相同效果的if/else实现
public final void acquire(int arg){
if(!tryAcquire(arg)){
if(acquireQueued(addWaiter(Node.EXCLUSIVE), arg)){
selfInterrupt();
}
}
如果使用自然语言描述进入最内层代码的流程 —"当前线程尝试获取锁失败,加入到阻塞队列尾部,首次/被唤醒后尝试获取锁并检查是否被中断,如果被中断,就进行自我中断"
如果考虑所有情况,势必会有一堆if/else,不得不说是很精巧的设置,为了降低阅读难度,可以自行替换为if/else形式方便理解
2.2 线程的"隐式"唤醒
以acquireQueued(addWaiter(Node.EXCLUSIVE), arg)为例(源码为for(;;),这里替换为do while方便理解)
//替换为下面这种形式更容易理解
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
//尝试获取锁,如果获取失败阻塞当前线程
do {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
} while (true)
} finally {
if (failed)
cancelAcquire(node);
}
}
"死循环尝试获取锁,如果失败阻塞线程并设置中断状态"
如果第一次循环时获取锁失败,线程就会被阻塞,那么后续循环是如何触发的呢? 被其他线程唤醒
然而唤醒并不会在acquireQueued方法中体现出来,这里可能会造成迷惑,因此在AQS中如果看到死循环,请结合此情形思考.(PS. 一个相关的知识 线程虚假唤醒的Java演示)
抽象模板
AQS使用了抽象模板设计模式,使用时需要根据功能实现它定义的几个抽象方法
独占式
- boolean tryAcquire(int arg)
- boolean tryRelease(int arg)
- boolean isHeldExclusively();
共享式
- int tryAcquireShared(int arg)
- boolean tryReleaseShared(int arg)
AQS同时提供了几个方法供子类使用
- int getState()
- void setState(int newState)
- boolean compareAndSetState(int expect, int update)
实例 TwinLock
TwinLock允许同时最多两个线程拥有锁,是共享式锁对AQS的典型用法
public class TwinLock implements Lock {
private final Sync sync = new Sync(2);
private static class Sync extends AbstractQueuedSynchronizer {
public Sync(int count) {
setState(count);
}
@Override
protected int tryAcquireShared(int arg) {
for (;;) {
int current = getState();
int newState = current - arg;
if (newState < 0 || compareAndSetState(current, newState)) {
System.out.println(Thread.currentThread().getName()
+ " acquired, newState " + newState);
return newState;
}
}
}
@Override
protected boolean tryReleaseShared(int arg) {
for (;;) {
int current = getState();
int newState = current + arg;
if (compareAndSetState(current, newState)) {
System.out.println(Thread.currentThread().getName()
+ " released, newState " + newState);
return true;
}
}
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
@Override
public boolean tryLock() {
try {
return sync.tryAcquireSharedNanos(1, 0);
} catch (InterruptedException ignore) {
}
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
sync.releaseShared(1);
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public static void main(String[] args) throws InterruptedException {
}
private static void tryLockTest() {
TwinLock lock = new TwinLock();
Runnable r = () -> {
boolean success = lock.tryLock();
if (success) {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
} else {
System.out.println(
Thread.currentThread().getName() + " acquire failed");
}
};
for (int i = 1; i <= 8; i++) {
new Thread(r, "t-" + i).start();
}
}
private static void lockTest() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
TwinLock lock = new TwinLock();
Runnable r = () -> {
lock.lock();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
latch.countDown();
}
};
for (int i = 1; i <= 10; i++) {
new Thread(r, "t-" + i).start();
}
latch.await();
}
- state控制同时拥有锁的最大线程数
- tryAcquireShared的javadoc规定
- 返回值< 0 表示获取失败
- 返回值 = 0 表示获取成功,但是后续的线程在没有线程释放锁的线程下都会失败
- 返回值 >0 表示获取成功,并且后续的线程也能成功
- tryAcquireShared通过死循环确保cas替换失败时会重试获取.
网友评论