AQS
by shihang.mai
AQS是实现ReentrantReadWriteLock的核心
ReentrantReadWriteLock
ReentrantReadWriteLock锁支持读读并发,但读写、写写都是互斥的
在我们研究ReentrantLock时,就知道AQS是用voliate int state去表示锁状态,int是4字节,32位的.在ReentrantReadWriteLock中需要用这个state表示读和写锁,就用高16位表示读锁,低16位表示写锁。因为无论读锁还是写锁都用了2字节表示,所以可重入次数为2^16-1
先看如下计算
0000000000000000 0000000000000000
----------------------------------------------
写入读锁,即在原来的基础上+65535
0000000000000001 0000000000000000
读取读锁数量,直接将1右边移16位,1>>>16
0000000000000000 0000000000000001
----------------------------------------------
写入写锁,即在原来基础上+1
0000000000000000 0000000000000001
读取写锁数量,原来的值与(高16位都为0,低16位都为1的值)做&运算
原值:0000000000000000 0000000000000001
&
辅值:0000000000000000 1111111111111111
结果:0000000000000000 0000000000000001
先来看一下构造方法
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
默认是非公平锁,并且new出了ReadLock和WriteLock,所以我们使用时如下就能获取到读锁和写锁
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
写锁lock
调用writeLock.lock()时
public static class WriteLock implements Lock, java.io.Serializable {
public void lock() {
sync.acquire(1);
}
}
继续调用,走到了AQS,这个很熟悉,在ReentranLock中也看到过
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里先执行tryAcquire()==false,那么才会继续走acquireQueued(),tryAcquire是ReentrantReadWriteLock 内部类Sync 中实现方法
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
}
- 先获取当前线程
- 获取volatile int state的值,然后函数exclusiveCount()根据state值获取到写锁的数量
-
注意此时调用的是写锁的lock
,当有锁,即进入条件if (c != 0)
- 当w==0,即有线程获取了读锁,那么直接返回false,无论当前线程.说明了
ReentrantReadWriteLock不支持读升级写冲入
- 当w!=0,即有线程获取了写锁,当前线程不是持有锁的线程,那么返回false,即写写互斥
- 当然加写锁前需要判断是否超过65535重入,如果第3的条件都不成立,即获取锁成功,设置state+1
- 当state==0,那么writerShouldBlock()==false(非公平锁),再CAS设置state+1,并设置占有锁线程为当前线程,然后true
当tryAcquire()返回false时,就将执行acquireQueued()这里不多说,在ReentranLock中已详细说明,下面为概括图
ypVnUI.png读锁lock
调用readLock.lock()时
public static class ReadLock implements Lock, java.io.Serializable {
public void lock() {
sync.acquireShared(1);
}
}
继续调用,走到了AQS的acquireShared(),意为获取共享锁
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
这里先调用tryAcquireShared(),如果返回值<0,那么执行doAcquireShared()
先来看看tryAcquireShared(),tryAcquire是ReentrantReadWriteLock 内部类Sync 中实现方法
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
}
- 获取当前线程
- 获取state值
- 如果有线程持有写锁,并且当前读线程不是持有写锁的线程,那么返回-1(读写互斥,并支持写降级读重入)
- 获取读锁的数量
- readerShouldBlock()
- 如果队列为空或者前面没有写锁在排队时,则返回false
- 如果头结点不为空,下一个结点也不为空并且是写锁时,返回true,表示要规规矩矩的排队
- 当readerShouldBlock()返回false时,继续判断读锁数量是否少于最大重入数65535,再CAS设置读锁+1,设置成功后
- 当还没设置读锁+1时,读锁为0,那么久不熬时当前线程就是第一个上读锁的线程
- 当当前线程==持有读锁线程,那么就是锁重入
- 当持有读锁线程!=当前读线程,这里涉及一个不同线程重入,用HoldCounter类,这个类只有线程id和重入次数两个字段,当线程重入的时候就会初始化这个类并保存在ThreadLocalHoldCounter类中,这个类就是继承ThreadLocl的,用来初始化HoldCounter对象并保存
- 读锁如果成功获取,那么返回1
- 当readerShouldBlock()返回true时,直接调用fullTryAcquireShared()
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
这个方法中代码和tryAcquireShared基本上一致,只是采用了自旋的方式,处理初次加锁中的漏网之鱼
如果tryAcquireShared返回-1,即加锁失败,那么走AQS中的doAcquireShared()
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 将当前线程封装成share Node放入CLH队列
- for无条件循环,判断当前节点的前继节点是否是头节点,如果是再一次尝试获取锁,获取锁成功,那么继续唤醒后面的所有share Node,到最后一个share Node节点(后面并没其他节点),将waitState=-3
- 前继节点不是头节点或者获取锁失败,那么就将前继节点waitState=-1,并挂起当前线程
读锁.unLock()
public static class ReadLock implements Lock, java.io.Serializable {
public void unlock() {
sync.releaseShared(1);
}
}
调到AQS
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
没什么好说的,就一直将读锁释放,全部释放后,唤醒写锁线程
写锁.unLock()
public static class WriteLock implements Lock, java.io.Serializable {
public void unlock() {
sync.release(1);
}
}
调到AQS
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
}
也没什么好说的,解锁将state变为0,重新唤醒CLH队列中的Node
网友评论