对于一般的锁,如ReentrantLock,其一般都是“独占式”的,也即在同一时刻只有一个线程能够访问锁定的代码。如果对于共享资源的访问“读”多于“写”,那么独占锁将没有“读写锁”有效。所谓“读写锁”指的是对共享资源的访问提供一个读锁和一个写锁,当访问方式是读取操作时,使用读锁即可,当访问方式是修改操作时,则使用写锁。读锁和写锁相互之间存在一定的制约条件:
- 当有读锁锁定资源时,其余的读锁可以共享式的访问资源,但是会阻塞写锁对资源的获取;
- 当有写锁锁定资源时,将阻塞其余所有的读锁(除了该线程本身)和写锁的获取;
在Java中,提供了一个通用的接口来表示读写锁,即ReadWriteLock,该类提供了两个方法:readLock()和writeLock()用于对读锁和写锁的获取,具体的声明如下:
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
除此之外,Java还提供了该接口的两个实现:ReadWriteLockView和ReentrantReadWriteLock。第一个类是StampedLock类的内部类,第二个则是我们正常可以使用的类,这里我们主要对ReentrantReadWriteLock进行讲解。
ReentrantReadWriteLock是一个复合词,其拆分开来为reentrant read write lock。reentrant是entrant加上re前缀,表示可重入的,read write lock则表示其是一个读写锁,整体理解也即是“可重入的读写锁”。可以看出,ReentrantReadWriteLock不仅实现了ReadWriteLock接口的规范,并且还提供了可重入的特性。如下是使用ReentrantReadWriteLock实现对缓存数据的访问的一个简单示例:
public class MapCache<T> {
private ReadWriteLock lock = new ReentrantReadWriteLock();
private Map<String, T> cache = new HashMap<>();
public void put(String key, T value) {
lock.writeLock().lock();
try {
cache.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
public T get(String key) {
lock.readLock().lock();
try {
T result = cache.get(key);
return result;
} finally {
lock.readLock().unlock();
}
}
}
可以看到,在对缓存数据进行读取时使用读锁,而进行修改时则使用写锁。下面为了展示读写锁的是如何工作的,我们编写了如下示例:
public class ReadWriteLockTest {
public static void main(String[] args) {
RWLCache cache = new RWLCache();
Random random = new Random();
for (int i = 0; i < 10; i++) {
boolean flag = random.nextBoolean();
Thread thread = new Thread(generateTask(flag, cache), generateThreadName(flag, i));
thread.start();
}
}
private static Runnable generateTask(boolean flag, RWLCache cache) {
Runnable putTask = () -> cache.write();
Runnable getTask = () -> cache.read();
return flag ? putTask : getTask;
}
private static String generateThreadName(boolean flag, int index) {
return flag ? "write-" + index : "read-" + index;
}
}
其中RWLCache的具体实现如下:
public class RWLCache {
private MyReentrantReadWriteLock lock = new MyReentrantReadWriteLock();
public void write() {
lock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + " acquires write lock, queued threads is " + getThreadNames(lock.getQueuedThreads()));
sleep(1);
} finally {
System.out.println(Thread.currentThread().getName() + " releases write lock");
lock.writeLock().unlock();
}
}
public void read() {
lock.readLock().lock();
try {
sleep(1);
System.out.println(Thread.currentThread().getName() + " acquires read lock, queued threads is " + getThreadNames(lock.getQueuedThreads()));
} finally {
System.out.println(Thread.currentThread().getName() + " releases read lock");
lock.readLock().unlock();
}
}
private static final class MyReentrantReadWriteLock extends ReentrantReadWriteLock {
@Override
protected Collection<Thread> getQueuedThreads() {
List<Thread> threads = new ArrayList<>(super.getQueuedThreads());
Collections.reverse(threads);
return threads;
}
}
private void sleep(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private String getThreadNames(Collection<Thread> threads) {
StringBuilder result = new StringBuilder("[");
Iterator<Thread> iterator = threads.iterator();
if (!threads.isEmpty()) {
Thread thread = iterator.next();
result.append(thread.getName());
}
while (iterator.hasNext()) {
Thread thread = iterator.next();
result.append(",").append(thread.getName());
}
return result.append("]").toString();
}
}
可以看到,在每次读写锁的获取和释放时,我们都打印了等待队列中的线程,如下是该程序的输出结果:
read-0 acquires read lock, queued threads is [write-2,read-3,write-4,write-5,read-6,read-7,write-8,read-9]
read-1 acquires read lock, queued threads is [write-2,read-3,write-4,write-5,read-6,read-7,write-8,read-9]
read-1 releases read lock
read-0 releases read lock
write-2 acquires write lock, queued threads is [read-3,write-4,write-5,read-6,read-7,write-8,read-9]
write-2 releases write lock
read-3 acquires read lock, queued threads is [write-4,write-5,read-6,read-7,write-8,read-9]
read-3 releases read lock
write-4 acquires write lock, queued threads is [write-5,read-6,read-7,write-8,read-9]
write-4 releases write lock
write-5 acquires write lock, queued threads is [read-6,read-7,write-8,read-9]
write-5 releases write lock
read-7 acquires read lock, queued threads is [write-8,read-9]
read-7 releases read lock
read-6 acquires read lock, queued threads is [write-8,read-9]
read-6 releases read lock
write-8 acquires write lock, queued threads is [read-9]
write-8 releases write lock
read-9 acquires read lock, queued threads is []
read-9 releases read lock
从输出结果看出,read-0和read-0两个线程在还未释放读锁的情况下同时获取到了读锁,而后续的写锁则被阻塞在队列中的;从写线程对锁的获取和释放可以看出,其在执行过程中是会阻塞其余的读锁和写锁的;另外,仔细观察read-6和read-7这两个线程,其在等待队列中的顺序是read-6在前,而read-7在后,但是在后续输出的时候是read-7在前,而read-6在后,这是因为在read-6获取到读锁之后,由于读锁是共享的,其会唤醒后续紧接着的尝试获取读锁的线程,因而read-6和read-7几乎是同时获取到读锁的,但是read-7首先获取到了cpu的执行权限,因而其先打印了其执行代码,而read-6则在获取到cpu的执行权限后输出。
从上面的示例代码可以看出,ReentrantReadWriteLock主要有四个方法:ReadLock.lock(),ReadLock.unlock(),WriteLock.lock()和WriteLock.unlock()。这里ReadLock.lock()和ReadLock.unlock()是尝试对读锁的获取和释放的,WriteLock.lock()和WriteLock.unlock()是尝试对写锁的获取和释放的。下面我们会对这四个方法分别进行介绍,首先我们先来看看ReentrantReadWriteLock的类结构图:
从图中可以看出,ReadLock和WriteLock都是ReentrantReadWriteLock的内部类,其主要是提供对写锁的锁的获取和释放相关的接口的。首先我们看看几个辅助的作用:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false;
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
FairSync和NonfairSync两个类用于表征当前的读写锁是公平的还是非公平的。非公平表示线程获取读锁和写锁的概率是随机的,也即谁先争取到执行权限就先执行锁定的代码;公平锁表示线程所有线程获取读锁和写锁的概率都是一样的,当线程尝试获取锁的权限时,其会被加入到一个等待队列中,每次都是队列头部的线程最开始执行锁定代码。一般的,公平锁虽不能保证每个线程都有相同的机会执行代码,但是其能够提高系统的吞吐率,但是其会提高造成死锁的概率;公平锁则能够保证每个线程都有均等的机会执行代码,其也能降低造成死锁的概率,但是其吞吐率没有非公平锁的高。
static final class HoldCounter {
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}
HoldCounter是一个计数辅助类,其count字段记录了每个线程重入读锁或写锁的次数,tid字段则记录了当前是记录的哪个线程。
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
ThreadLocalHoldCounter继承自ThreadLocal,并且重写了initialValue()方法,也就是说每个线程使用该类时都有一个默认的HoldCounter计数类。这里ThreadLocalHoldCounter主要是为每个线程保存其计数器对象的。
ReentrantReadWriteLock内部内部也是基于AbstractQueuedSynchronizer实现的,这里Sync也就是该类的子类。AbstractQueuedSynchronizer内部是通过一个volatile类型的整型变量state来控制锁的获取的,这里volatile能够保证多线程环境下该变量对多个线程都可见,而整型int占32个字节,无论是32位还是64位处理器,其都能够保证每次处理的最小字节单元是32位,也就是说对该变量的更新操作都是原子的。AbstractQueuedSynchronizer主要提供了如下几个方法:
public final void acquire(int arg);
public final void acquireShared(int arg);
protected int tryAcquire(int arg);
protected int tryAcquireShared(int arg);
public final boolean release(int arg);
public final boolean releaseShared(int arg);
protected boolean tryRelease(int arg);
protected boolean tryReleaseShared(int arg);
可以看到,这些方法都是成对呈现的,AbstractQueuedSynchronizer使用了模板方法模式来实现对state属性的控制的。这里acquire(int)和acquireShared(int)方法是对外的接口,分别表示以非共享或者共享的方式获取锁的执行权限。tryAcquire(int)和tryAcquireShared(int)方法则是提供的钩子方法,子类按照需要实现这两个方法,这两个方法分别是acquire(int)和acquireShared(int)方法调用的,用于控制当前线程是否获取到了锁的执行权限,如果获取到了,则当前线程继续执行后续代码,如果没有则均通过CAS算法获取锁的权限。同理,release(int)和releaseShared(int)方法也是对外提供的接口,分别表示以非共享和共享的方式释放锁的权限。tryRelease(int)和tryReleaseShared(int)方法则也是钩子方法,由子类实现,其分别是由release(int)和releaseShared(int)方法调用,而其作用则分别为以非共享或者共享的方式释放当前锁的执行权限。首先我们看看ReentrantReadWriteLock内部Sync的tryAcquire(int)方法的具体实现:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 获取当前正在占用写锁的次数
if (c != 0) { // c不等于0表示当前肯定有读锁或者写锁被占用
if (w == 0 || current != getExclusiveOwnerThread()) // 当前没有写锁占用或者占用写锁的不是当前线程
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 占用写锁的次数超过最大限制次数
throw new Error("Maximum lock count exceeded");
setState(c + acquires); // 走到这一步说明当前线程是正在占用写锁的线程,那么成功获取锁的权限
return true;
}
// 这里说明c==0,也即没有线程占用锁,那么判断写锁是否应该被阻塞,并且判断能否通过CAS算法设置state属性值
// 这里writerShouldBlock()也即前面讲的FairSync和NoneFairSync中重写的方法
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current); // 设置当前独占锁的线程为当前线程
return true;
}
这里需要说明的是,由于Sync通过state属性控制两个锁,这就需要两种状态,因而Sync将state属性分为两部分:高16位和低16位。高16位的值指定了读锁的占用次数,低16位的值则指定了写锁的占用次数,并且这里还可以推断出,如果state不为0,而低16位等于0,那么说明高16位一定不为零,也就是说当前有线程正在占用读锁。为了对读写状态的获取方便,Sync中声明了几个变量和相关的方法:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
可以看出,sharedCount(int)方法用于获取高16位的数值,exclusiveCount(int)用于获取低16位的数值,并且这里MAX_COUNT表示读锁和写锁最多获取的次数。
我们再回过头来看tryAcquire(int)方法,该方法表示一个线程能否以独占的方式获取锁的执行权限,其首先判断c是否不为0,然后在该if条件中判断是否有线程占用读锁,或者是占用写锁的线程不是当前线程,如果成立则表示当前线程获取写锁失败,这也就是为什么获取了写锁的线程将会阻塞其余的读线程和写线程的原因;如果第一个条件不成立,则说明当前线程是已经占用写锁的线程,那么就判断已经占用的次数(也即重入的次数)加上此次占用的次数是否会超过最大占用次数,如果不成立,则表示当前线程可以成功(再次)获取写锁的权限,那么就设置state属性的值为新的值,并返回true。如果c为0,则说明当前没有线程占用读锁和写锁,那么就判断写锁是否应该被阻塞,或者能否通过CAS算法成功设置state为新的属性值。如果写锁不应该被阻塞,并且成功更新了state的值,那么表示当前线程获取写锁成功,则设置独占锁的拥有者为当前线程。这里需要说明的是,writerShouldBlock()方法是Sync的子类FairSync和NoneFairSync分别重写了的方法。由前面的代码可以看出,FairSync中该方法是判断当前阻塞队列中是否有等待的线程,如果有等待的线程,则返回true,也就表示当前线程应该被阻塞,因为根据公平锁的协议,线程获取锁的顺序是到达的先后顺序;在NoneFairSync中,writerShouldBlock()方法始终返回false,表示在非公平锁中,只要一个线程尝试竞争锁的执行权限,那么其就会不会被阻塞。这也就是公平锁和非公平锁的区别。
类似于tryAcquire(int)方法,tryAcquireShared(int)方法则表示当前线程能否以共享的方式获取锁的执行权限。如下是该方法的代码:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 判断是否有写锁存在,或者是有写锁存在的情况下是否占用写锁的是当前线程
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 判断读锁是否应该被阻塞,其次判断占用读锁的次数是否超过最大限制,
// 并且尝试更新CAS算法设置state属性的值为新的值
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current; // 记录第一个获取读锁的线程
firstReaderHoldCount = 1; // 记录第一个获取读锁的线程重入次数
} else if (firstReader == current) {
firstReaderHoldCount++; // 更新第一个获取读锁的线程的重入次数
} else {
HoldCounter rh = cachedHoldCounter; // 获取缓存的线程的计数器
// 如果缓存的计数器不为当前线程的计数器不为当前线程的计数器,则更新其为当前线程的计数器
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0) // 如果当前线程计数器计数为0,则为其设置一个计数器
readHolds.set(rh);
rh.count++; // 更新计数器的值,即当前线程的重入次数
}
return 1;
}
return fullTryAcquireShared(current);
}
从tryAcquireShared(int)方法可以看出,其首先判断是否有线程占用写锁,并且如果有线程占用写锁,则判断占用写锁的线程是否不为当前线程,如果都不成立,则说明有其他的线程占用写锁,那么当前线程就会被阻塞。这也就是读写锁中锁降级的原理,即一个锁获取读锁之后,在还没有释放锁的情况下获取写锁,然后释放读锁,这样该线程就只占用一个读锁了,即从写锁降级为读锁。这里的判断当前占用写锁的线程是否为其他线程就保证了当前线程如果占用了写锁,那么其可以同时尝试获取读锁。在接下来的if条件中,首先判断readerShouldBlock(),即当前读线程是否应该被阻塞,该方法也是FairSync和NoneFairSync中重写的方法,在FairSync中,其会判断阻塞队列中是否有阻塞的线程,如果有则当前线程也应该被阻塞,在NoneFairSync中,其会判断当前阻塞队列中最近的被阻塞的节点是否为尝试获取写锁的线程,如果是,则当前线程应该被阻塞。如果读锁不应该被阻塞,并且当前线程不会造成获取读锁次数超限,那么就会尝试通过CAS算法更新state的值为新的值。在成功通过CAS算法设置state属性值之后,说明当前线程成功获取到了读锁,那么其就会更新记录其重入次数的计数器。如果上述条件不成立,那么当前线程就会通过无线for循环尝试获取锁的权限,具体的实现在fullTryAcquireShared(Thread)中:
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) { // 判断是否有写锁正在被占用
if (getExclusiveOwnerThread() != current) // 判断占用写锁的是否为当前线程
return -1;
} else if (readerShouldBlock()) { // 判断读锁是否应该被阻塞
if (firstReader == current) {
} else { // 走到这一步说明没有写锁占用,或者是占用写锁的线程为当前线程,并且读锁不应该被阻塞,
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0) // 如果重入次数为0,说明其当前是没有占用读锁的状态
readHolds.remove();
}
}
if (rh.count == 0) // 当前线程没有占用读锁,因为处于readerShouldBlock()块中,因而应该被阻塞
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) { // 使用CAS算法更新state属性的值
if (sharedCount(c) == 0) {
firstReader = current; // 记录第一个读线程
firstReaderHoldCount = 1; // 记录第一个读线程的重入次数
} else if (firstReader == current) {
firstReaderHoldCount++; // 更新第一个读线程的重入次数
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++; // 更新当前线程的重入次数
cachedHoldCounter = rh; // 缓存当前读线程的计数器
}
return 1;
}
}
}
可以看出tryAcquireShared(int)和fullTryAcquireShared(Thread)方法的区别在于tryAcquireShared(int)方法只会对当前线程尝试一次是否能够获取到读锁的权限,其不一定能够保证当前线程获取到读锁,而fullTryAcquireShared(Thread)方法则在一个无限for循环中尝试获取读锁,其能够保证在没有写锁或者写锁不为当前锁的情况下当前线程一定能够获取到读锁。这里也可以看出,在不存在写锁的情况下,如果是多个线程尝试竞争读锁,那么每个线程都会进入无限for循环中,并且保证一定能够获取到读锁,此时CPU是一直在运转的,相较于AbstractQueuedSynchronizer.doAcquireShared(int)方法,如果当前线程没有竞争到读锁,那么其也会进入无限for循环,但是在循环一次之后其就会被操作系统“搁置”起来,从而释放资源。ReentrantReadWriteLock的这种竞争读锁的机制能够保证线程以最快的方式获取到读锁,而不必被“搁置”,从而造成不必要的线程环境切换。
上面讲解的tryAcquire(int)和tryAcquireShared(int)方法分别被获取写锁的线程和获取读锁的线程调用,分别表示能否成功以独占的方式和以共享的方式获取写锁和读锁的权限。接下来我们来看看tryRelease(int)和tryReleaseShared(int)方法,这两个方法分别表示能否成功以独占的方式和以共享的方式释放写锁和读锁的权限。如下是tryRelease(int)方法的源码:
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively()) // 判断当前线程是否为以独占方式占用写锁权限的线程
throw new IllegalMonitorStateException();
int nextc = getState() - releases; // 记录释放写锁后state属性的值
boolean free = exclusiveCount(nextc) == 0; // 判断释放写锁后是否还存在写锁
if (free)
setExclusiveOwnerThread(null); // 如果不存在写锁,则清楚独占锁的线程状态
setState(nextc); // 设置state属性的值
return free;
}
可以看到,在tryRelease(int)方法中,其首先判断当前线程是否为以独占方式占用(写)锁的线程,如果不是则其没有权限释放锁。接着主要是对state属性的值进行相应的更新。这里需要注意的是,在free变量的判断中,使用的是exclusiveCount(nextc)是否为0,其实可以理解,当前以独占方式占用写锁的线程,如果其没有同时占用读锁,那么其高位部分肯定都是0,因而只需要判断nextc是否为0即可,这里去除高位部分,只判断低位部分是否为0的原因就是当前线程可能既同时占用写锁又占用了读锁,因而不能简简单单的判断nextc是否为0。接下来我们看看tryReleaseShared(int)方法,以下是该方法的实现:
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) { // 如果当前线程为第一个更新state属性的线程,则维护相关的变量值
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) // 获取当前线程的计数器
rh = readHolds.get();
int count = rh.count;
if (count <= 1) { // 如果计数器的计数值小于等于1,说明释放后其会至少变为0,因而清除计数器相关信息
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count; // 更新计数器的值
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) // 以CAS算法更新state属性的值
return nextc == 0;
}
}
可以看到,tryReleaseShared(int)方法首先判断当前线程是否为将state属性从0更新为1的线程,如果是,则更新相关的属性值,否则对当前线程的计数器信息进行更新。更新完相关属性值之后,当前线程会进入一个无线for循环,并且以CAS算法更新state属性的值,这样可以保证一定能够以原子的形式将state更新成功。这里返回值为nextc是否为0,这是因为当前线程可能重入当前锁多次,也就是说如果当前线程还没有完全释放当前锁的时候,其是不会执行后续唤醒后续线程中的代码的。
网友评论