在java.util.concurrent
包中,大部分的同步器都是基于AbstractQueuedSynchronizer(AQS)
这个框架实现的。这个框架为同步状态提供原子性管理、线程的阻塞和解除阻塞以及排队提供了一种通用机制。
同步器一般包含2种方法,一种是acquire
,另一种是release
。acquire
操作阻塞线程,获取锁。release
通过某种方式改变让被acquire
阻塞的线程继续执行,释放锁。为了实现这2种操作,需要以下3个基本组件的相互协作:
- 同步状态的原子性管理
- 线程的阻塞和解除阻塞
- 队列管理
同步状态
/**
* The synchronization state.
*/
private volatile int state;
AQS
使用一个int
变量来保存同步状态,并暴露出getState
、setState
以及compareAndSet
来读取或更新这个状态。并且用了volatile
来修饰,保证了在多线程环境下的可见性。通过使用compare-and-swap(CAS)
指令来实现compareAndSetState
。
这里的同步状态用int
而非long
,主要是因为64位long
字段的原子性操作在很多平台上是使用内部锁的方式来模拟实现的,这会使得同步器的会有性能问题。绝对多数int
型的state
足够我们使用,但JDK
也提供了long
型state
的实现:java.util.concurrent.locks.AbstractQueuedLongSynchronizer
。
阻塞
JDK1.5
之前,阻塞线程和解除线程阻塞都是基于Java
自身的监控器。在AQS
中实现阻塞是用java.util.concurrent
包的LockSuport类。方法LockSupport.park
阻塞当前线程,直到有个LockSupport.unpark
方法被调用。
队列管理
AQS
框架关键就在于如何管理被阻塞的线程队列。提供了2个队列,分别是线程安全Sync Queue(CLH Queue)
、普通的Condition Queue
。
Sync Queue
Sync Queue
是基于FIFO
的队列,用于构建锁或者其他相关同步装置。CLH
锁可以更容易地去实现取消(cancellation)
和超时
功能,因此我们选择了CLH
锁作为实现的基础。
队列中的元素Node
是保存线程的引用和线程状态。Node
是AQS
的一个静态内部类:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
Node
类的成员变量如上所示,主要负责保存线程引用、队列的前继和后继节点,以及同步状态:
成员 | 描述 |
---|---|
waitStatus | 用来标记Node的状态: CANCELLED:1, 表示当前线程已经被取消 SIGNAL:-1,表示当前节点的后继节点等待运行 CONDITION:-2, 表示当前节点已被加入Condition Queue PROPAGATE:-3, 共享锁的最终状态是PROPAGATE |
thread | 当前获取lock的线程 |
SHARED | 表示节点是共享模式 |
EXCLUSIVE | 表示节点是独占模式 |
prev | 前继节点 |
next | 后继节点 |
nextWaiter | 存储Condition Queue中的后继节点 |
Node
元素是Sync Queue
构建的基础。当获取锁的时候,请求形成节点挂载在尾部。而锁资源的释放再获取的过程是从开始向后进行的。
acquire 获取锁
在AQS
自身仅定义了类似acquire
方法。在实现锁的时候,一般会实现一个继承AQS
的内部类Sync
。而在Sync
类中,我们根据需求来实现重写tryAcquire
方法和tryRelease
方法。独占锁acquire
方法如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- 通过
tryAcquire(由不同的实现类实现)
尝试获取锁,如果可以获取锁直接返回。获取不到锁,则调用addWaiter
方法;
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
-
addWaiter
方法作用是把当前线程封装成Node
节点,通过CAS
操作快速尝试挂载至队列尾部。- 如果
tail
节点t
已经有了:将t
节点更新为当前节点node
的前继节点node.prev
,将t.next
更新为当前节点node
; - 如果
tail
节点添加失败:- 如果
tail
节点为空,那么原子化的分配一个头节点,并将尾节点指向头节点,这一步是初始化; - 如果
tail
节点不为空,循环重复addWaiter
方法的工作直至当前节点入队为止。
- 如果
- 如果
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 节点加入
Sync Queue
之后,接下来就是要进行锁的获取,或者说是访问控制了,只有一个线程能够在同一时刻继续的运行,而其他的进入等待状态。- 获取当前节点的前继节点
- 当前继节点是头结点并且能够获取状态,代表该当前节点占有锁;如果满足上述条件,那么代表能够占有锁,根据节点对锁占有的含义,设置头结点为当前节点。
- 否则进入等待状态。
至此,可以总结一次acquire
的过程大致为:
release 释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 首先通过
CAS
操作变更同步状态state
。 - 释放成功后,通过
LockSupport.unpark
方法来唤醒后继节点,后继节点继续获取锁。
Condition Queue
AQS
框架提供了一个ConditionObject
内部类,给维护独占同步的类以及实现Lock
接口的类使用。一个锁对象可以关联任意数目的条件对象,可以提供典型的Java
监视器风格的await
、signal
和signalAll
操作,包括带有超时的,以及一些检测、监控的方法。Condition Queue
是普通的队列并不要求是线程安全,原因是在线程在操作Condition
时,要求线程必须独占锁,不需要考虑并发的问题。
Condition Queue
也是以Node
为基础的队列。
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
await操作
-
Condition
在执行await
操作时,首先会调用addConditionWaiter()
方法将当前线程封装的Node
节点加入到wait queue
。
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
上述addConditionWaiter
的逻辑是:
- 首先清除
Condition Queue
队列中cancelled
状态的尾节点; - 如
Condition Queue
队列为空,封装当前线程的node
节点为Condition Queue
的firstWaiter
。如Condition Queue
队列不为空,则把该节点加至队列尾部。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
- 加入
Condition queue
之后,要释放当前线程获取的所有的锁; - 如果线程没有在
Sync Queue
中,将调用LockSupport.park
阻塞当前线程,直到signalled
或者interrupted
唤醒去获取锁。
single 操作
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
- 首先检查线程是否独占锁;
- 获取
Codition Queue
的firstWaiter
,将节点转移至Sync Queue
中去。
singleAll 操作
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
signalAll
唤醒Condition Queue
的所有等待线程,将所有的Condition Queue
中的node
元素转移至Sync Queue
中去。
其他API
这里只介绍了独占锁模式下,普通acquire
、release
方法的原理,AQS
还提供了很多可以供我们选择的API
:
- 如优先考虑中断、超时的:
acquireInterruptibly
、tryAcquireNanos
; - 如共享锁模式下
acquireShared
、releaseShared
;
等等...
这里暂时不详细分析,后面有时间的话可以再做了解。
参考:
网友评论