在J2SE 1.5的java.util.concurrent包(下称j.u.c包)中,大部分的同步器(例如锁,屏障等等)都是基于AbstractQueuedSynchronizer(下称AQS类)这个简单的框架来构建的;
那么J.U.C包下的同步器主要有以下几个功能:
- 内部同步状态的管理
- 同步状态的更新和检查操作
- 且至少有一个方法会导致调用线程在同步状态被获取时阻塞,以及在
其他线程改变这个同步状态时解除线程的阻塞
而AQS就实现了以上功能,供其他同步器使用。
所有同步器都有两个基本方法,acquire,release。acquire操作阻塞调用的线程,直到或除非同步状态允许其继续执行。而release操作则是通过某种方式改变同步状态,使得一或多个被acquire阻塞的线程继续执行。(不用同步器命名不同Lock.lock,Semaphore.acquire,CountDownLatch.await和FutureTask.get...)
之前提过Synchronized内置锁,JVM对其进行了许多优化,其性能已经比ReentrentLock更好,但是常规的JVM锁优化策略并不适用于严重依赖于J.U.C包的典型多线程服务端应用。
大部分情况下,特别在同步器有竞争的情况下,稳定地保证其效率才是J.U.C包的主要目标。
同步器的acquire与release
acquire
while(同步状态不允许acquire){
放入队列 if 没有进队;
依具体需求来决定是否阻塞当前线程;
}
出队 if 已入对;
release
更新同步状态;
if(状态允许被足阻塞线程acquire){
解除一个或多个队列里的阻塞线程;
}
要实现上述功能需要三个基本组建的相互协作:
- 同步状态的原子性管理
- 线程的阻塞与解除阻塞
- 队列的管理
同步状态
AQS用单个32位int值来保存同步状态
阻塞
LockSupport.park阻塞LockSupport.unpark解阻塞
LockSupport.park()
LockSupport.park(Object)
LockSupport.parkNanos(Object, long)
LockSupport.parkNanos(long)
LockSupport.parkUntil(Object, long)
LockSupport.parkUntil(long)
LockSupport.unpark(Thread)
队列
在AQS中采用CHL列表来解决有序的队列的问题。
一个新的节点node,通过原子操作入队:
do{
pred = tail;
} while(!tail.compareAndSet(pred, node);
每一个节点的释放状态都保存在前驱节点中:
while (pred.status != RELEASED);
自旋后的出队操作:
head = node;
优点在于其入队和出队操作是快速、无锁的,以及无障碍的(即使在竞争下,某个线程总会赢得一次插入机会而能继续执行);且探测是否有线程正在等待也很快(只要测试一下head是否与tail相等);同时,“释放”状态是分散的,避免了一些不必要的内存竞争。
AQS
public abstract class AbstractQueuedSynchronizer extends
AbstractOwnableSynchronizer implements java.io.Serializable {
//等待队列的头节点
private transient volatile Node head;
//等待队列的尾节点
private transient volatile Node tail;
//同步状态
private volatile int state;
......
static final class Node {
//表示共享模式,如CountDownLatch
static final Node SHARED = new Node();
//表示独占模式,如ReentrentLoack
static final Node EXCLUSIVE = null;
//节点操作因为超时或者对应的线程被interrupt。节点不应该留在此状态,
//一旦达到此状态将从CHL队列中踢出。
static final int CANCELLED = 1;
//等待触发,即节点为SIGNAL ,后继节点会挂起
static final int SIGNAL = -1;
//表明节点对应的线程因为不满足一个条件(Condition)而被阻塞。
static final int CONDITION = -2;
//状态需要向后传播
static final int PROPAGATE = -3;
// 初始状态为0,新生的非CONDITION节点都是此状态。
volatile int waitStatus;
//此节点的前一个节点。节点的waitStatus依赖于前一个节点的状态。
volatile Node prev;
//此节点的后一个节点。后一个节点是否被唤醒(uppark())依赖于当前节点是否被释放。
volatile Node next;
//节点绑定的线程
volatile Thread thread;
//标记当前节点的模式是共享还是独占
Node nextWaiter;
...
}
}
AQS.Node
实现原理
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//在节点加入队列过程中若线程被中断,则会调用该方法,底层调用interrupt()
selfInterrupt();
}
需要子类重写tryAcquire与tryRelease方法利用CAS来修改同步状态status;
多线程下各个线程都会尝试修改状态,如果可以修改则tryAcquire返回true,acquire直接返回;若不能修改,放进队列;
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter方法建一个当前线程的Node放入队列,值得注意的是利用了CAS来完成插入操作;
接下来总的逻辑是:放入队列后,会检查前一个节点的状态,前一个节点状态为SIGNAL则挂起当前线程通过LockSupport.park(this);
来看看acquireQueued的实现
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
for循环里检验前一个节点的状态,为啥要用无限循环呢?为防止其被意外唤醒。
当前一个节点为head时说明该轮到它了,再次尝试tryAcquire;
没轮到它的时候,会执行shouldParkAfterFailedAcquire,该方法只有在节点状态为SIGNAL返回true,CANCELLED则删除节点,其它情况就用CAS将状态改为SIGNAL;
来看看shouldParkAfterFailedAcquire方法实现:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire返回true,则会调用parkAndCheckInterrupt将线程挂起,被唤醒后根据线程中断标记来返回boolean值;
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted(); //会清除标记位
}
看代码,假设一种情况,A之前一直挂起,现在轮到A了,也就是前一个节点是head,head已经释放了同步状态,与是唤醒A,A在acquireQueued方法for循环中苏醒尝试tryAcquire,但是这时一个新的线程先一步执行acquire方法,先于A 执行tryAcquire,A又得挂起;由此可看出这是非公平的,有线程插队
Release
子类需要实现tryRelease;
重置同步状态位0,唤醒后继节点;在这里也有被插队的可能,因为同步状态已经归零。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//这里h.waitStatus == 0的唯一可能是head没有后继节点
//waitStatus默认为0,后继节点加入队列后将前一个节点waitStatus设为SIGNAL
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//唤醒后继节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
总结
举例说明变化情况:现在有一个线程N正在执行,阻塞队列为空,A线程tryAcquire失败,addWaiter创建了一个空的Node作为Head,它的next指向A线程的Node,随后进入acquireQueued,再次尝试tryAcquire因为线程可能已经执行完了,失败调用shouldParkAfterFailedAcquire,此时将头Head的同步状态由0变为SIGNAL返回false,回到acquireQueued里再次循环,还是尝试再次tryAcquire,失败调用shouldParkAfterFailedAcquire,由于Head的waitStatus为SIGNAL返回true,进入parkAndCheckInterrupt,将A线程阻塞;若又一线程B也失败,它将会将A 的waitStatus变为SIGNAL,排在A的后面阻塞着;
执行的线程完成了,release释放同步状态,唤醒阻塞队列里的线程;接着上面的逻辑,首先N线程tryRelease成功(存在被插队的可能),取出head节点执行unparkSuccessor,将head节点waitStatus重置为0,取出head.next也就是A,LockSupport.unpark唤醒A线程,逻辑回到了A阻塞的地方也就是acquireQueued的for循环里,再次尝试tryAcquire(在这里可能被插队),成功,将A设为head,将原先为空的head的next指针清除以便GC回收;
以ReentrantLock为例:
网友评论