AQS的功能可以分为两类:独占与共享;如ReentrantLock利用了其独占功能,CountDownLatch,Semaphore利用了其共享功能。
AQS的静态内部类Node里有两个变量,独占锁与共享锁在创建自己的节点时(addWaiter方法)用于表明身份,它们会被赋值给Node的nextWaiter变量。
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
独占锁
独占锁就是每次只允许一个线程执行,当前线程执行完会release将同步状态归零,再唤醒后继节点,这里通过自定义tryAcquire来实现公平与非公平(即是否允许插队);
acquire & release
//成功代表同步状态的变更,排斥其他线程;否则加入等待队列
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//归零同步状态,唤醒后继节点
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
这里通过同步状态来实现独占功能。
共享锁
如Semaphore,CountDownLatch,它们调用的是AQS里的acquireSharedInterruptibly与releaseShared;实现自己的tryAcquireShared与tryReleaseShared,这里便体现了独占与共享的不同,独占锁的tryAcquire,tryRelease返回boolean代表同步状态更改的成功与否;tryAcquireShared返回int值,tryAcquireShared返回0代表当前线程能够执行,但之后的将会进入等待队列中;返回正数直接执行,之后的线程可能也可以直接执行;
以Semphore的非公平锁为例,如果当前正在执行的线程数小于限制值,就CAS更改同步状态值,线程直接执行。返回负数代表正在执行的线程数达到允许值。
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
Semphore限制同时执行的线程数,当一个线程acquire成功
acquireShared
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//这里对中断的处理也不同,共享式的线程被唤醒后发现中断标记后,会直接抛中断异常
//而独占锁采用的中断策略是恢复中断标记,也就是交给调用方去处理中断
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
releaseShared
tryReleaseShared由子类实现,返回boolean,以Semphore为例
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
当一个线程执行完会增加我们的同步状态值,返回true,之后doReleaseShared唤醒head.next节点里的线程。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
doReleaseShared在之前介绍Semaphore里说过;若有后继节点就唤醒它;若没有(即ws == 0,因为waitStatus的值是后继节点赋予的)则将head节点waitStatus改为PROPAGATE,之后进入的节点会将此head的状态改为SIGNAL,具体实现在shouldParkAfterFailedAcquire里。
总结
独占与共享最大不同就在各自的tryacquire里,对于独占来说只有true或false,只有一个线程得以执行任务;而对于共享锁的tryAcquireShared来说,线程数没达到限制都可以直接执行。
但本质上都是对AQS同步状态的修改,一个是0与1之间,另一个允许更多而已。
网友评论