为什么要有读写锁
ReentrantReadWriteLock 适用于读多写少的场景,标识同一时间,可以有多个线程并发读,但是不可以多个线程并发写。由于互斥锁,如:ReentrantLock,上锁后,无论你是读操作还是写操作,它们之间都是互斥的来保证线程安全,但是这样做的话,效率比较低。
使用示例
public class ReentrantReadWriteLockTest {
public static void main(String[] args) throws InterruptedException {
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
ReentrantReadWriteLock.ReadLock readLock = reentrantReadWriteLock.readLock();
ReentrantReadWriteLock.WriteLock writeLock = reentrantReadWriteLock.writeLock();
Thread r1 = new Thread(() -> {
// 上读锁
readLock.lock();
try {
System.out.println("r1 开始读啦!!");
} finally {
// 释放读锁
readLock.unlock();
}
});
Thread r2 = new Thread(() -> {
// 上读锁
readLock.lock();
try {
System.out.println("r2 开始读啦!!");
} finally {
readLock.unlock();
}
});
Thread w1 = new Thread(() -> {
// 上读锁
writeLock.lock();
try {
System.out.println("w1 开始写啦!!");
} finally {
// 释放写锁
writeLock.unlock();
}
});
r1.start();
r2.start();
w1.start();
}
}
核心思想
- 基于AQS实现,用state来标识是否获取到锁,获取不到锁,线程交由AQS同步队列进行管理
- 用32位的state来同时表示读锁和写锁,低16位表示写锁,高16位表示读锁
- 公平与非公平
- 写写互斥,读写互斥,读读共享
- 锁降级
源码分析
构造方法
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** Performs all synchronization mechanics */
final Sync sync;
/**
* Creates a new {@code ReentrantReadWriteLock} with
* default (nonfair) ordering properties.
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
}
从构造方法上看,默认是使用非公平的方式获取锁
获取写锁(互斥锁)
AQS.aquire(int arg) 方法
/**
* 获取互斥锁的模版方法
* 流程:调用子类实现的tryAcquire方法尝试获取锁,若获取锁成功,则终止,若获取不成功,则调用AQS的addWaiter方法,新增节点到AQS的双端同步队列里,
* 然后调用acquireQueued方法再次进行判断获取锁资源,若还是获取不到锁,则将线程阻塞
* @param arg
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
ReentrantReadWriteLock. tryAcquire(int arg) 方法
/**
* 获取写锁(互斥锁)
* @param acquires
* @return true:获取锁成功;false:获取锁失败
*/
protected final boolean tryAcquire(int acquires) {
// 获取当前线程对象
Thread current = Thread.currentThread();
// 获取当前锁状态(AQS 的 state)
int c = getState();
// 获取写锁(互斥锁)数量
int w = exclusiveCount(c);
if (c != 0) { // 当前有锁状态(读锁或写锁,或两者都有)
// (Note: if c != 0 and w == 0 then shared count != 0)
/**
* 重要:
* w==0 : 意味着目前没有线程持有写锁。则:(c != 0 && w == 0) 意味着当前有线程持有读锁,但是没有任何线程持有写锁
* current != getExclusiveOwnerThread(): 意味着当前线程没有持有写锁。
* 所以:return false 有两种情况:1、当前有线程持有读锁(理论:不能够进行锁升级,读写互斥) 2、目前有其他线程持有写锁(理论:写写互斥)
* 这两种情况,满足其一,都会返回false --> 当前线程进去AQS队列进行等待调度
*/
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 判断持写锁数量是否超过最大值,超过则报错
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
/**
* 走到这里意味着:(w != 0 && current == getExclusiveOwnerThread() && (w + exclusiveCount(acquires) < MAX_COUNT))= true
* 即:当前线程本身就持有了写锁,并且写锁数量小于最大值。这里相当于写锁的重入(锁重入)
*/
setState(c + acquires);
return true;
}
/**
* 走到这里意味着:c==0 即 当前属于无锁状态。
* writerShouldBlock():判断是否需要阻塞当前线程,分公平和非公平两种情况。
* 公平:看在我前面是否有线程正在阻塞(AQS队列中,是否有节点正在等待唤醒),有:则writerShouldBlock() == true。若无:则为 false,表示当前线程不需要阻塞。
* 总结:公平的获取写锁的机制为:查看前面是否有线程持有锁(无论是读锁还是写锁)或有线程正在等待锁,若有,直接进入队列等待
* 非公平:writerShouldBlock()永远返回的是false,即不需要看前面是否有节点正在阻塞等待唤醒,直接上来就cas抢锁
* 总结:非公平的获取写锁的机制为:
*
*/
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires)) // CAS抢锁
return false;
// 走到这里意味着:线程抢锁(写锁)成功,将exclusiveOwnerThread属性设置为自己,标识我已经获得了写锁,你们其他线程进队列等待去吧
setExclusiveOwnerThread(current);
return true;
}
ReentrantReadWriteLock. addWaiter() 方法
/**
* 向队列添加新的节点,从尾巴插入
* @param mode 新增的节点对象
* @return 返回新的尾节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* 向队列添加新的节点,从尾巴插入
* @param node 新增的节点对象
* @return 返回新的尾节点
*/
private Node enq(final Node node) {
for (;;) { // 自旋,向队列尾部新增节点,直到成功为止
Node t = tail;
if (t == null) { // Must initialize
/**
* 若当前的尾节点为空,说明现在还没有任何节点插入到队列内,此时我们对头节点和尾节点进行初始化new Node()。
* 因此得出一个结论:头节点在后续的逻辑中,永远不可能为空
*/
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 将当前节点设置为新的尾节点
t.next = node;
return t;
}
}
}
}
我们可以看到,在调用enq()方法前,程序会先进行cas做节点插入逻辑,但是enq()方法也有同样的插入逻辑,这样做会不会感觉逻辑重复了,是不是多余了?
实际上这是一个优化,大部分情况下是不用走到enq方法里去自旋插入节点的,为了提高性能,避免进入for循环,这里将插入操作提前了。
AQS.acquireQueued(final Node node, int arg) 方法
/**
* 获取锁资源,获取不到则对线程进行阻塞
* @param node 已经在队列里的节点
* @param arg
* @return
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 是否被中断标识
boolean interrupted = false;
for (;;) { // 自旋
// 获取前驱节点
final Node p = node.predecessor();
/**
* 若前驱节点为头节点,那么当前线程此时立刻尝试获取锁
*
* 问:为什么要在这里获取锁?
* 答:因为前驱节点如果是头节点的话,会存在两种情况:
* 1. 该头结点是初始化的虚节点,那么当前线程对应的node节点就应该是队列里的第一个"线程节点",那么此时应该直接尝试获取锁,并且将头结点更新为当前节点
* 2. 该头节点不是虚节点,是正常的"线程节点",那么此时很有可能头结点释放了锁,那么此时去尝试获取锁就有比较大的概率获取到,并且将头结点更新为当前节点
*/
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 抢锁成功,return false
return interrupted;
}
// 判断线程是否可以阻塞,可以的话则进行阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果上面的代码,出现异常,那么这里进行兜底处理,会判断failed标识符,若为true,则调用cancel方法将节点失效掉
if (failed)
cancelAcquire(node);
}
}
/**
* 判断当前线程是否可以"安心"的阻塞
* @param pred
* @param node
* @return
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取当前线程的前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 若前驱节点的状态为SIGNAL(-1),则直接返回true,表示可以安心的进行阻塞了
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
/**
* 若前驱节点的状态>0,即为CANCELLED(1)状态,则遍历往前寻找到一个CANCELLED状态节点为止,并将寻找到的节点与本节点进行关联。
* 关联完后,返回false,由上游程序自旋重新调用本方法进行判断
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
/**
* 若前驱节点的状态不为SIGNAL(-1),CANCELLED(1)。状态可能为:CONDITION(-2),PROPAGATE(-3)或 初始化(0)状态,
* 则将前驱节点的状态变成SIGNAL后,由上游程序自选重新调用本方法进行判断
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
* 阻塞当前线程
* @return 被中断,则返回true,否则 返回false
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
AQS.cancelAcquire方法
/**
* 取消节点。
* 流程:将当前节点Thread置为空-->获取到当前节点往前的第一个有效节点(非CANCELLED状态)-->将当前节点的状态置为CANCELLED(失效)-->将当前节点从队列内清除出去-->唤醒后继节点
* 虽然在这个方法里,会将当前节点从队列内清除出去,但是在并发情况下,其他线程有可能获取到当前节点的状态为CANCEELED
* @param node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
// 日常判空
if (node == null)
return;
node.thread = null;
// 获取前驱节点
Node pred = node.prev;
// 从后往前遍历获取到不为CANCELLED状态的前驱节点为止
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获取前驱节点的下一个节点
Node predNext = pred.next;
// 将当前线程节点的状态变为CANCELLED状态
node.waitStatus = Node.CANCELLED;
/**
* 若当前线程节点为队列内的尾节点,则将上面找到的有效的前驱节点作为新的尾节点(CAS操作,将tail变量赋值为pred),成功后tail==pred
*/
if (node == tail && compareAndSetTail(node, pred)) {
/**
* 重点:
* 由于是双向练表,所以这里需要将新的尾节点(pred)的next节点cas为null。此时的状态为:pred->null pred<-node,当前节点的前驱指针还是指向pred的。
* 由于此时next-null,所以整条链路的next是不完整的,所以后续遍历查找节点应该从后往前利用pred指针找
*/
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
/**
* 进到这里意味着:当前的节点不是尾节点 或 上面cas更新尾节点失败(并发引起)
*/
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
// 将上面找到的前驱节点与当前线程的后继节点相连(CAS next指针)
compareAndSetNext(pred, predNext, next);
} else {
// 唤醒后继节点
unparkSuccessor(node);
}
// 当前节点的后继节点指向自己,方便GC
node.next = node; // help GC
}
}
释放写锁(互斥锁)
AQS.release(int arg) 和unparkSuccessor(Node node)方法
// 释放互斥锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
// 唤醒后继节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 当前释放锁的节点的状态
int ws = node.waitStatus;
if (ws < 0)
// 若当前节点状态不是CANCELLED状态(即:是有效节点),则直接cas,将当前节点状态修改为0(初始化状态)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
/**
* 若当前节点的后继节点为空 或者 后继节点的状态为CANCELLED状态,则从后往前找到第一个状态不为CANCELLED的节点
* 问:为什么要从后往前找?
* 答:这是由于在取消节点的cancelAcquire()方法里,我们取消节点时,会有短暂的时刻导致next指针不完整,但是pre指针对整条链路来说是完整的,所以需要从后往前找。详细看cancelAcquire方法
*/
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒上面找到的当前线程的后继有效节点
LockSupport.unpark(s.thread);
}
获取读锁(共享锁)
AQS.acquireShared(int arg) 方法
/**
* 获取共享锁,若获取不成功,则将线程放到队列里面去等待。
* 流程:调用子类的tryAcquireShared方法尝试获取锁,若获取锁不成功,则调用doAcquireShared方法,入队等待
* @param arg
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
ReentrantReadWriteLock.tryAcquireShared(int arg) 方法
/**
* 获取读锁(共享锁)
* @param unused
* @return
*/
protected final int tryAcquireShared(int unused) {
// 获取当前线程对象
Thread current = Thread.currentThread();
// 获取当前锁状态
int c = getState();
/**
* exclusiveCount(c) != 0 意味着 当前有线程持有写锁(互斥锁)
* getExclusiveOwnerThread() != current 意味着 当前持有写锁(互斥锁)的线程不是本线程
*/
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) // 这个判断意味着,同个线程,获取完写锁后,也可以直接获取读锁,即同个线程同时可以持有写锁和读锁(支持锁降级)
// 走到这里表示:当前有其他线程持有写锁,那么本线程获取读锁失败,直接返回-1,交由AQS将本线程放入队列内等待唤醒调度
return -1;
// 获取读锁数量
int r = sharedCount(c);
/**
* readerShouldBlock() 判断是否需要阻塞当前线程,分公平和非公平两种情况。
* 公平: 看在我前面是否有线程正在阻塞(AQS队列中,是否有节点正在等待唤醒),有:readerShouldBlock() == true。若无:则为 false,表示当前线程不需要阻塞。
* 非公平:看我前面是否有线程正在等待着获取写锁(即互斥节点),若有,则readerShouldBlock() == true。若无:则为 false,表示不需要阻塞,可以直接进行cas抢锁
* 问:非公平方式获取读锁,为什么需要判断队列里面是否有互斥节点(等待获取写锁的线程)?
* 答:防止写线程饥饿。如果获取读锁不需要判断队列内是否有线程在等待获取写锁的话,那么如果有大量的线程在争抢读锁的情况的话,
* 写线程将会很久拿不到写锁,将会长久阻塞,不利于写线程的执行。这就是线程饥饿,所以才需要在获取读线程先判断是否有线程正在等待获取写线程。
* 若有,则读线程也需要进入队列排队等待,若没有,则读线程直接上去CAS抢锁
*/
if (!readerShouldBlock() &&
r < MAX_COUNT && // 判断读锁数量是否小于最大值
compareAndSetState(c, c + SHARED_UNIT)) {
// 走到这里意味着:目前队列中没有等待的线程(公平) 或 没有等待获取写锁的线程(非公平),并且读锁数量在最大值范围内,且cas获取读锁成功
if (r == 0) { // 读锁数量为0,即:我是第一个获取到读锁的线程
/**
* 将当前线程标识为第一个获取读锁的线程。思考:为什么要这样?有什么好处?
* 答:提升性能,因为维护ThreadLocal成本比较高
*/
firstReader = current;
// 第一个获取读锁的线程对应的锁重入次数
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 进到这里来意味着:程序允许到现在,只有一个线程在一直持有锁,就是当前线程,所以将读锁重入次数++
firstReaderHoldCount++;
} else {
/**
* 进到这里意味着:已经有其他线程持有读锁了,那么我需要在ThreadLocal里维护自己的读锁重入次数
* cachedHoldCounter:为HoldCounter类型对象,始终保存最近(最后)获取锁的线程及其锁重入次数
*/
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
/**
* 进到这里意味着:
* 情况1:cachedHoldCounter == null ,那么此时需要进行更新,更新最后获取锁的线程引用
* 情况2:当前标识的最后获取锁的线程对象不是自己本身,那么也需要对其进行更新
*/
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
/**
* 进到这里意味着:当前标识的最后获取锁的线程是自己本身,并且锁数量被释放完了(rh.count == 0)
* 问:为什么需要readHolds.set(rh)?
* 答:怎么样才能让程序进入到这里,就是最后一个线程在获取锁后,又将锁给释放掉,然后又重新获取锁。因为释放锁的同时,
* 会将count--,当减到0时,会将当前线程在ThreadLocal里的引用给remove掉,所以才需要这里的set操作。
* new Thread(()->{
* readLock.lock();
* readLock.unlock();
* readLock.lock();
* }).start();
*/
readHolds.set(rh);
rh.count++;
}
// 返回抢锁成功
return 1;
}
/**
* 走到这里意味着:
* 1、readerShouldBlock() == true 即:判断到当前线程需要进行阻塞。
* 公平:表明前面有线程等待获取锁(无论是读锁还是写锁)。
* 非公平:表明前面有写线程正在等待获取写锁(队列内有互斥节点)
* 2、r < MAX_COUNT 读锁数量超过最大限制
* 3、CAS抢锁失败
*/
return fullTryAcquireShared(current);
}
/**
* 一个完整的获取读锁(共享锁)的方法。当调用tryAcquireShared()获取锁没成功,就会调用此方法来获取读锁。
* 这里解释一下:tryAcquireShared()方法上面讲了获取锁的流程,其实是作者为了性能做的一个优化,一般情况下是不用走到fullTryAcquireShared()这个方法的,
* 除非在tryAcquireShared()方法中获取锁不成功就会走到这个方法来死循环重试拿锁。
*/
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
// 自旋抢锁
for (;;) {
// 获取当前锁状态
int c = getState();
// 判断当前有没有线程持有写锁(互斥锁),0表示没有,>0表示有
if (exclusiveCount(c) != 0) {
// 判断当前持有写锁的线程是不是当前线程本身,若不是,则return -1 ,交由AQS管理,让本线程进入队列等待
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) { // 判断当前读线程是否需要被阻塞。
// Make sure we're not acquiring read lock reentrantly
/**
* 进到这里意味着:
* 1、公平:判断到队列内有线程正在等待获取锁
* 2、非公平:判断到队列内有写线程在等待获取锁
*/
if (firstReader == current) {
// 若当前线程是已经获取过读锁的第一个线程,那么啥这里啥也不做,交由最下面的代码进行CAS操作
// assert firstReaderHoldCount > 0;
} else {
// 判断rh==null,为什么需要判断是否为空,是因为有可能同一个线程,在下面CAS抢锁失败后,重新循环抢锁,这时,rh就可能不为空
if (rh == null) {
// cachedHoldCounter:为HoldCounter类型对象,始终保存最近(最后)获取锁的线程及其锁重入次数。也是一种优化手段,避免维护ThreadLocal。
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
/**
* 走到这里意味着:线程在下面CAS失败后,重新循环从头开始走逻辑,但是此时,当前线程的锁已经被释放了。
* 此时直接返回-1,让当前线程进入队列等待
*/
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
/**
* 走到这里意味着:
* 情况1:当前持有互斥锁的线程是当前线程本身
* 情况2:当前线程判断到不需要阻塞(公平:前面没有任何线程在等待获取锁,那么直接CAS获取锁。非公平:前面没有任何线程在等待获取锁 或 前面没有写线程在队列里等待)
* 情况3:当前线程是firstReader线程
*/
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
// 返回抢锁成功
return 1;
}
}
}
AQS.doAcquireShared(int arg) 方法
/**
* 获取共享锁,不成功则将线程进行阻塞
* @param arg
*/
private void doAcquireShared(int arg) {
// 向队列新增一个共享节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {// 自旋
// 获取前驱节点
final Node p = node.predecessor();
if (p == head) {
// 若前驱节点为头节点,则尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
// 若获取锁成功,则设置新的头节点,并结束程序
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 若获取锁失败,或前驱节点不是头节点,阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
// 如果上面的代码,出现异常,那么这里进行兜底处理,会判断failed标识符,若为true,则调用cancel方法将节点失效掉
cancelAcquire(node);
}
}
释放共享锁
AQS.releaseShared(int arg)
/**
* 流程:调用子类tryReleaseShared方法尝试释放锁,释放锁成功后,则调用doReleaseShared方法,唤醒下一个节点进行抢锁
* @param arg
* @return
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
ReentrantReadWriteLock.tryReleaseShared(int unused)
/**
* 释放共享锁(读锁)
* @param unused
* @return
*/
protected final boolean tryReleaseShared(int unused) {
// 获取当前线程对象
Thread current = Thread.currentThread();
// 判断当前线程是否是firstReader线程
if (firstReader == current) {
if (firstReaderHoldCount == 1)
// 若本线程的读锁重入次数此时判断已经是为1了,那么本次释放就会将这个线程的读锁全部释放完,那么也需要释放本线程,所以将firstReader引用指向空
firstReader = null;
else
// 读锁重入次数--
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
/**
* 进到这里意味着:
* 情况1:cachedHoldCounter == null ,注意,这里跟获取共享锁的逻辑不同,这里是释放锁,所以不用更新cachedHoldCounter引用
* 情况2:当前线程不是cachedHoldCounter线程(不是最后获取锁的线程),那么当前线程的持有锁的重入次数情况需从readHolds(ThreadLocal)里面获取
*/
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
// 锁重入次数<=1,那么本次释放后,需要将本个线程也释放掉,那么readHolds里面也不需要再维护本线程的持锁重入情况了,所以需要remove掉
readHolds.remove();
if (count <= 0) // 释放锁太多次,导致益处,则抛异常
throw unmatchedUnlockException();
}
--rh.count;
}
// 自旋 CAS释放锁,即CAS减少state值,直到CAS成功为止
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// 释放锁成功,判断到锁重入次数为
return nextc == 0;
}
}
AQS.doReleaseShared()方法
/**
* 释放共享锁,唤醒后续等待的线程节点
*/
private void doReleaseShared() {
for (;;) {
// 获取当前的头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 若节点状态为SIGNAL(-1),则cas将状态置为初始化状态(0)
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 上面cas成功,则唤醒下一个节点
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
锁降级
锁降级:表示同一个线程,获取了写锁后,在释放写锁前,又获取了读锁,我们称这个过程叫做锁降级。通过对tryAcquireShared源码的分析,ReentrantReadWriteLock是允许锁降级的。
小试牛刀
- 问题1:ReentrantReadWriteLock是如何解决线程饥饿的?
- 问题2:firstReader和firstReaderHoldCount的作用是什么?
- 问题3:所有读线程维护自身ThreadLocalHoldCounter,将所有的ThreadLocalHoldCounter里的count相加,是否与state的高16位的值一致?
- 问题4:什么是锁降级?
- 问题5:AQS的同步队列为什么是双端队列?
这里只对问题5进行一个回答,其他的问题通过上面的源码分析,都可以找到答案。
AQS的同步队列为什么是双端队列?
主要原因是:使用双端队列,可以提高性能,避免节点多线程操作冲突。一般情况下,会从头将节点摘除,然后从尾添加节点,这样可以减少节点的并发操作冲突。这个思想在ForkJoinPool里也是可以看到的。
网友评论