上一张分析了ReentrantLock,ReentrantReadWriteLock和ReentrantLock一样基于AbstractQueuedSynchronizer实现
ReentrantLock在同一时间只允许一个线程访问,而ReentrantReadWriteLock允许多个读线程同时访问,但有一定的排他条件,不允许写线程和读线程、写线程和写线程同时访问,ReentrantReadWriteLock的state不像ReentrantLock0表示无锁,>=1表示有锁。读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。
首先我们先看ReentrantReadWriteLock 的Sync的一些关键属性:
static final int SHARED_SHIFT = 16;
//共享锁(读锁)状态单位值65536
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
//共享锁线程最大个数65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
//排它锁(写锁)掩码 二进制 15个1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//用来记录最后一个获取读锁的线程获取读锁的可重入次数
private transient HoldCounter cachedHoldCounter;
//用来记录第一个获取到读锁的线程
private transient Thread firstReader;
//用来记录第一个获取到读锁的线程获取读锁的可重入次数
private transient int firstReadHoldCount;
//用来存放除去第一个获取读锁线程外的其他线程获取读锁的可重入次数
private transient ThreadLocalHoldCounter readHolds = new ThreadLocalHoldCounter();
/** 返回读锁线程数 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回写锁可重入个数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
ReentrantReadWriteLock有readerLock和writerLock两把锁,都是内部类。
/** Inner class providing readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** Inner class providing writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
写的源码实现
写锁的加锁
public void lock() {sync.acquire(1); }
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
//r如果已经有了读锁或者写锁
if (c != 0) {
// 如果已经有线程获取了读锁,或者写锁的线程不是自己就返回false
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//说明有线程获取到了写锁,重入次数+1
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
//当前没有读锁或写锁,则获得写锁。非公平锁中writerShouldBlock写锁方法默认返回false,公平锁中根据hasQueuedPredecessors 来进行了判断,队列中有前置节点就放弃获取锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
tryLock() 方法与lockInterruptibly()
1.tryLock如果已经有线程拿到了读锁或者写锁,就直接返回false。不会阻塞
2.writeLock.lockInterruptibly();方法跟lock()很类似,当其它的线程 调用此线程的interrupt() 方法中断了当前线程,当前线程会抛出InterruptedException InterruptedException,源码如下:
写锁的释放
public void unlock() { sync.release(1);}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//如果写锁拥有者不是当前线程就抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
//如果state-1=0 则表示释放,否则只是减少了重入次数
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
读锁的获取与释放
读锁的获取
public void lock() { sync.acquireShared(1);}
public final void acquireShared(int arg) {
//tryAcquireShared代码一
if (tryAcquireShared(arg) < 0)
//代码二
doAcquireShared(arg);
}
代码一解析
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//如果写锁被占用则加锁失败返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//获取读锁个数
int r = sharedCount(c);
//判断是否需要阻塞,不需要则尝试加锁,否则进入fullTryAcquireShared。
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果读锁的个数是0,则当前线程为firstReader,firstReader数量设置为1
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
//如果当前线程为firstReader。则firstReaderHoldCount+1
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
//记录最后一个获取读锁的线程或记录其它线程读锁的可重入数
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
fullTryAcquireShared 死循环获取读锁。包含锁降级策略。
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
// // 如果存在写锁
if (exclusiveCount(c) != 0) {
// // 不是当前线程,获取锁失败,反之,如果持有写锁的是当前线程,那么就会进入下面的逻辑
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
读锁的释放
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//如果第一个获取读锁线程就是当前线程
if (firstReader == current) {
//如果firstReaderHoldCount次数为1
if (firstReaderHoldCount == 1)
firstReader = null;
else
//否则可重入次数减去1
firstReaderHoldCount--;
} else {
//当前线程如果不是最后一个获取读锁的线程,就从Threadlocal获取
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
//可以重入的次数减去一
--rh.count;
}
//代码1
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
代代码1:通过自旋到自己的读计数-1, 直到cas更新成功
网友评论