主要成员
读写锁也是区分了公平和非公平,除此之外还区分了读锁和写锁
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
static final class NonfairSync extends Sync {
...
}
static final class FairSync extends Sync {
...
}
public static class ReadLock implements Lock, java.io.Serializable {
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
...
}
public static class WriteLock implements Lock, java.io.Serializable {
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
...
}
}
构造器
同样默认也是非公平的
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
AQS中的state
读写锁把AQS中的一个state字段当成了两个字段来使用
高16位记录当前共享锁数量,低16位记录当前排他锁数量
private volatile int state;
读写锁中的Sync相对于重入锁多了以下一些方法和常量字段
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 获取共享锁的数量
static int sharedCount(int c) {
return c >>> SHARED_SHIFT;
}
// 获取排他锁的数量
static int exclusiveCount(int c) {
return c & EXCLUSIVE_MASK;
}
...
}
lock
这里先提前剧透下,读写锁中的写锁几乎和ReentrantLock没有区别,感兴趣的可以直接看WriteLock中的实现,以及公平和非公平的区别也和ReentrantLock中没区别,感兴趣的可以直接看我上一篇ReentrantLock源码解析
所以这里直接看读锁的实现
public void lock() {
// 是直接委托AQS去实现的
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
// 当前有写并发,并且不是自己,走排队流程
return -1;
// 获取当前共享锁数量
int r = sharedCount(c);
// 刚开始的时候readerShouldBlock肯定是false,因为AQS队列还没初始化
// 返回true的话就是队列中第一个节点是写请求
if (!readerShouldBlock() &&
r < MAX_COUNT &&
// CAS对共享锁数量+1
compareAndSetState(c, c + SHARED_UNIT)) {
// 修改成功才会进入
if (r == 0) {
// 设置一些成员表明,自己是第一个读锁请求线程
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 这里是为了可重入做判断
firstReaderHoldCount++;
} else {
// 如果是后来的其他读锁请求
// 会通过cachedHoldCounter把读锁数量缓存起来,方便之后unlock的时候计算,当前的共享锁数量
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
// return 1就是获取锁成功了
return 1;
}
// 修改共享锁数量失败
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
// 这里也是用死循环来确保共享锁修改成功
for (;;) {
// 前半段和上面一个方法是一样的
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 判断下最大count,由于读写锁分离了高低16位,所以读并发或者读的可重入数量是不能超过65535
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 修改共享锁数量成功
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
排队流程
private void doAcquireShared(int arg) {
// 往AQS队列末尾添加一个共享节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 如果自己是队列的第一个
// 尝试去获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 修改成功,将会连锁唤醒下一个共享节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 和ReentrantLock一样
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 和ReentrantLock一样
if (failed)
cancelAcquire(node);
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
// 这里就是Propagate(传播)的含义了
// 会唤醒下一个共享节点,并且这个节点也会尝试唤醒下一个共享节点,以此类推
doReleaseShared();
}
}
unlock
读锁在解锁的时候略有不同
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// 当由第一个线程解锁的时候,会把这些成员重置
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
// 把缓存读锁数量逐步减1
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// 设置共享锁数量成功后,要判断是不是清0了,清0成功才会走唤醒流程
return nextc == 0;
}
}
我觉得和ReentrantLock没什么区别,最后都是调用LockSupport的unpark去唤醒队列中的第一个节点
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 这里要把head节点的状态设置为0,设置成功了才会去唤醒
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
关于读写锁的疑问
- 记录读锁相关的一些成员,其实这些成员以我拙见是没什么用的,jdk注释也说了,意思是相当于提供一个数据提供的入口,但是读锁的数量包括是否最终release还是以state的高16位来判断的
private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
总结
ReentrantReadWriteLock相较于之前的ReentrantLock来说,最大的特点就是读操作的几乎是可以同时进行的(除了修改共享锁数量时的CAS操作,因为CAS操作只可能有一个线程成功)。
- 第一个获取锁的线程是读操作,在他释放锁之前,中间时间的所有读操作都是可以同时进行的,写操作是需要在AQS中排队的。
- 第一个获取锁的线程是写操作,那么后续所有的线程都需要进AQS队列排队,但当这个写操作释放锁之后,如果又被读操作获取到了锁,那么在这个读操作之后排队的其他连续(以AQS链表顺序为准)读操作均会被唤醒进而可以读并发。
网友评论