前言
JDK 1.5 的 java.util.concurrent.locks 包中都是锁,其中有一个抽象类 AbstractQueuedSynchronizer (抽象队列同步器),也就是 AQS, 我们今天就来看看该类。
1.结构
类结构我们看看该类的结构,该类被 CountDown,ThreadPoolExecutor,ReentrantLock,ReentrantReadWriteLock,Semaphore 的内部类所继承,而这些内部类都是这些锁的真正实现,不论是公平锁还是非公平锁。
也就是说,这些锁的真正实现都是该类来实现的。那么,我们就从这些锁开始看看是如何实现从锁到解锁的。
2. 重入锁的 lock 方法
我们先看看重入锁 ReentranLock 的 lock 方法。
public void lock() {
sync.lock();
}
该方法调用了内部类的 sync 抽象类的 lock 方法,该方法的实现有公平锁和非公平锁。我们看看公平锁是如何实现的:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
调用了 acquire 方法,该方法就是 AQS 的的方法,因为 sync 继承了 AQS,而公平锁继承了 Sync,等于间接继承了 AQS,我们看看该方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
该方法JDK注释 :
以独占模式获取对象,如果被中断则中止。通过先检查中断状态,然后至少调用一次 tryAcquire(int) 来实现此方法,并在成功时返回。否则在成功之前,或者线程被中断之前,一直调用 tryAcquire(int) 将线程加入队列,线程可能重复被阻塞或不被阻塞。可以使用此方法来实现 Lock.lockInterruptibly() 方法。
楼主来简单说一下该方法的作用:该方法会试图获取锁,如果获取不到,就会被加入等待队列等待被唤醒,这个其实和我们之前分析的 synchronized 是差不多的。
我们仔细看看该方法,首先是 tryAcquire 方法,也就是尝试获取锁,该方法是需要被写的,父类默认的方法是抛出异常。如何重写呢?抽象类定义一个标准:如果返回 true,表示获取锁成功,反之失败。
tryAcquire我们回到 acquire 方法,如果获取锁成功,就直接返回了,如果失败了,则继续后面的操作,也就是将线程放入等待队列中:
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
我们先看看 addWaiter(Node.EXCLUSIVE) 方法:
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
该方法注释:将当前线程放入到队列节点。参数呢?参数有2种,Node.EXCLUSIVE 是独占锁,Node.SHARED 是分享锁。
在 Node 类种定义了这两个常量:
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
独占锁是null,共享锁是空对象。
我们看看该方法的步骤:
- 创建一个当前线程的 Node 对象(nextWaiter 属性为 null, thread 属性为 当前线程)。
- 获取到末端节点,如果末端节点不为 null,则将末端节点设置为刚刚创建的节点的 prev 属性。
2.1. 通过 CAS 设置末端节点为新的节点。如果成功,将刚刚创建的节点设置为老末端节点的next节点。最后返回。 - 如果 tail 末端节点是null,则调用enq 方法。创建一个末端节点,然后,将刚刚创建的末端节点设置为新节点的 prev 属性(此时的末端节点就是 head 头节点)。最后返回刚刚创建的 node 节点。
我们看看 enq 方法的实现:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
该方法步骤如下:
- 死循环,获取到末端节点,如果是null,则使用CAS创建一个头节点(头节点此时也是null),并将头节点赋值末端节点。
- 由于刚刚CAS 成功,走else 逻辑,将末端节点赋值给新节点的 prev 属性,使用CAS设置新的末端节点为刚刚创建的 node对象。然后返回node 对象。
该方法主要就是初始化头节点和末端节点,并将新的节点追加到末端节点并更新末端节点。
我们会到 addWaiter 方法中,该方法主要作用就是根据当前线程创建一个 node 对象,并追加到队列的末端。
我们再回到 acquire 方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
addWaiter 方法会返回刚刚创建的node 对象,然后调用 acquireQueued 方法,我们进入该方法查看:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
该方法步骤如下:
- 死循环。先获取 node 对象 prev 节点,如果该节点和 head 相等,说明是他是第二个节点,那么此时就可以尝试获取锁了。
1.1 如果获取锁成功,就设置当前节点为 head 节点(同时设置当前node的线程为null,prev为null),并设置他的 prev 节点的 next 节点为 null(帮助GC回收)。最后,返回等待过程中是否中断的布尔值。 - 如果上面的两个条件不成立,则调用 shouldParkAfterFailedAcquire 方法和 parkAndCheckInterrupt 方法。这两个方法的目的就是将当前线程挂起。然后等待被唤醒或者被中断。稍后,我们仔细查看这两个方法。
- 如果挂起后被当前线程唤醒,则再度循环,判断是该节点的 prev 节点是否是 head,一般来讲,当你被唤醒,说明你别准许去拿锁了,也就是 head 节点完成了任务释放了锁。然后重复步骤 1。最后返回。
我们看看 shouldParkAfterFailedAcquire 方法:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
该方法步骤如下:
- 获取去上一个节点的等待状态,如果状态是 SIGNAL -1,就直接返回 true,表示可以挂起并休息。
- 如果 waitStatus 大于 0, 则循环检查 prev 节点的 prev 的waitStatus,知道遇到一个状态不大于0。该字段有4个状态,分别是 CANCELLED = 1,SIGNAL = -1, CONDITION = -2, PROPAGATE = -3,也就是说,如果大于 0,就是取消状态。那么,往上找到那个不大于0的节点后怎么办?将当前节点指向 那个节点的 next 节点,也就是说,那些大于0 状态的节点都失效这里,随时会被GC回收。
- 如果不大于0 也不是 -1,则将上一个节点的状态设置为有效, 也就是 -1.最后返回 false。注意,在acquireQueued 方法中,返回 false 后会继续循环,此时 pred 节点已经是 -1 了,因此最终会返回 true。
再看 parkAndCheckInterrupt 方法(挂起并检查是否中断):
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
该方法非常的简单,就是将当前线程挂起,等到有别的线程唤醒(通常是 head 节点中线程),然后返回当前线程是否是被中断了,注意,该方法会清除中断状态。
回到 acquireQueued 方法,总结一下该方法,该方法就是将刚刚创建的线程节点挂起,然后等待唤醒,如果被唤醒了,则将自己设置为 head 节点。最后,返回是否被中断。
再回到 acquire 方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
在该方法中,如果获取锁失败并被唤醒,且被中断了,那么就执行 selfInterrupt 方法:
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
将当前线程设置中断状态位。
好了,到这里,整个lock 方法,我们基本就分析完了,可以说,整个方法就是将线程放入到等待队列并挂起然后等待 head 节点唤醒。其中,tryAcquire 方法高频出现,该方法具体实现由子类实现,比如 重入锁,读写锁,线程池的 worker,其中 CountDown 和 Semaphore 实现的是共享模式的 tryAcquire 方法,但原理相同。AQS 如何定义的?就是返回 true 表示拿到锁了,返回 false 表示拿锁失败,具体如何实现AQS管不了。但他们都依赖一个极其重要的字段 ------- state。
楼主有必要说说这个字段,该字段定义了当前同步器的状态,如果大家知道 pv 原语的话,应该很好理解这个字段,该字段在 AQS 中是如何定义的:
/**
* The synchronization state.
*/
private volatile int state;
volatile。该字段可能会被多个线程修改,因此,需要设置为 volatile ,保证变量的可见性。
我们可以看看 重入锁中的公平锁是如何使用该字段的。
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
该方法重写了 tryAcquire 方法,步骤如下:
- 获取当前线程,获取锁(同步器)的状态。
- 如果同步器等于0,就 CAS 设置 state 为 1,表示同步器被占用了,并且设置同步器的持有线程为当前线程(为了判断重入)。最后返回拿锁成功 true。
- 如果不是0,并且当前线程就是同步器的持有线程,说明是重入。那么就将 state 加1,最后返回 true。所以说,当你重入一次,就需要解锁一次,否则下个线程永远拿不到锁。
- 如果都不是,返回 false ,表示拿锁失败。
从这里,我们可以看到, statei 字段非常的重要,判断锁是否被持有完全根据这个字段来的。这点一定要注意,而这个设计和操作系统的 pv 由异曲同工之妙。
那么看完了拿锁,再看看解锁,我们可以先猜想一下如何设计,首先肯定是要将 state 字段设置为 0,才能让下个线程拿锁,然后呢?唤醒等待队列中的下个线程。让他尝试拿锁。那到底 doug lea 是不是这么设计的呢?我们来看看。
3. 重入锁的 unlock 方法
该方法调用了AQS 的 release 方法:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先尝试释放,如果成功,则唤醒下一个线程。
我们先看看 tryRelease 方法 (需要重写):
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
该方法步骤如下:
- 计算同步器状态减去1后的值。
- 判断同步器线程和当前线程是否相同,如果不同,抛出监视器状态异常。
- 判断状态是否是 0,也就是说,如果是0,表示没有线程持有锁了,那么就是设置 free 为 true,并且设置同步器的 thread 属性为null,
- 最后设置 state 为 计算的值,这里需要考虑重入。最后返回。
可以看到,如果 state 不是 0 的话,就会返回 false ,后面的步骤就没有了,也就是说,重入锁解锁的时候不会唤醒下一个线程。
如果解锁成功,执行下面的步骤,如果 head 头节点不是 null 并且他的状态不是0,说明有线程可以唤醒,执行 unparkSuccessor 方法。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
该方法步骤如下:
- 获取到头节点的状态。
- 如果小于0,CAS 设置状态为0。
- 获取到头节点的next 节点,判断是否为null,或者 next 节点是否大于0,如果是null 或者大于0,则从末端节点开始向上查找,直到找到状态小于等于0 的节点。
- 最后唤醒该节点的线程。
这个时候,等待在 acquireQueued 方法中,准确的说是 parkAndCheckInterrupt 方法中的 线程被唤醒,开始继续循环,尝试拿锁(需要修改 state 变量),并设置自己为 head。
这里还有一个漏掉的地方,就是 waitStatus 变量,什么时候会大于等于0? 该变量默认是 0,大于 0 的状态是被取消的状态。什么时候会被取消呢? 在acquireQueued 方法中,如果方法没有正常结束,则会执行 finally 中的 cancelAcquire 方法,该方法会将状态变成 1,也就是取消状态。
4 总结
这次我们分析 AQS,也就是锁的的真正实现,只分析了 lock 方法和 unlock 方法,这两个方法是重入锁的基础。CountDown 和 Semaphore 是共享锁,但是基本原理相同,只是将 state 的数字加大便可以实现。而和重入锁等锁相关联的 Condition 则是通过 LockSupport 工具类直接挂起当前线程,并将当前线程添加到等待队列中,当调用 Condition 的 signal 方法时,则唤醒队列中的第一个线程。具体源码我们有机会再分析。
总之,java 重入锁的实现基于 AQS,而 AQS 主要基于 state 变量和队列来实现。实现原理和 pv原语 类似。
good luck!!!!!
网友评论