1. 简介
ReentrantLock 都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
在没有读写锁之前,只能通过 Java 的等待通知机制,就是当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知之后,所有等待的读操作才能继续执行(写操作之间依靠 synchronized 关键进行同步),这样做的目的是使读操作能读取到正确的数据,不会出现脏读。改用读写锁实现上述功能,只需要在读操作时获取读锁,写操作时获取写锁。当写锁被获取到时,后续(非当前写操作线程)的读写操作都会被阻塞,写锁释放之后,所有操作继续执行。Java 并发包提供读写锁接口 ReadWriteLock 是基于 ReentrantReadWriteLock
实现的,其特性包括:
- 公平性选择:支持非公平和公平的获取锁方式,但非公平的吞吐量大于公平的。
- 重入锁:支持重入锁,即获取读锁之后还能再次获取读锁,获取写锁后还能再次获取读锁或写锁。
- 锁降级:允许获取写锁,获取读锁,释放写锁的次序,即写锁可以降级为读锁。
- 锁获取中断:读取锁和写入锁都支持获取锁期间被中断。
-
支持条件变量:写入锁提供了条件变量
Condition
的支持,这个和独占锁一致,但是读取锁却不允许获取条件变量,将得到一个 UnsupportedOperationException 异常。
2. 实现原理
ReadWriteLock 基于 ReentrantReadWriteLock 实现的,而 ReentrantReadWriteLock 是基于 AQS(AbstractQueuedSynchronizer)实现的。读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。以 ReentrantLock 中的自定义同步器的实现为例,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态得涉及成为读写锁实现得关键。
在一个整型变量中维护多个状态,需要使用 “按位切割方法”,在读写锁中,我们需要将一个整型变量表示为读和写两种状态,所以会将整型变量分割为两部分,高16位标识读状态,低16位为写状态。
通过位运算便可以确定读写状态。假设当前同步状态值为S,写状态等于 S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0由移16位)。当写状态增加1时,等于S+(1<<16),也就是S+0x00010000。
2.1 读锁的获取
读锁是一个支持重入的共享锁,它能够被多个线程同时持有,在没有其他写线程访问时,读锁总是会被成功获取。ReadLock 提供了 lock() 方法来获取读锁,最终会调用 Sync 类的 tryAcquireShared(int) 方法来获取读锁。
protected final int tryAcquireShared(int unused) {
// 获取当前线程对象
Thread current = Thread.currentThread();
// 获取同步状态
int c = getState();
// 如果存在写锁,且持有写锁的线程不是当前对象,返回-1,表示获取读锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取读锁状态
int r = sharedCount(c);
// 如果读锁不需要等待且读锁获取次数没超过限制,同时同步状态修改成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// 读锁第一次被线程获取
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 读锁被第一次获取读锁的线程重复获取
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 获取读锁失败,调用fullTryAcquireShared(Thread)方法,放到循环里重试
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
// 这里对应锁降级,若当前线程已持有写锁,则允许当前线程继续获取读锁,否则直接返回
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// 如果读线程需要阻塞
// 当前线程是第一次获取读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 当前线程不是第一次获取读锁的线程
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
// 如果读锁达到了最大值,抛出异常
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 同步状态修改成功
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 下面的处理与tryAcquireShared(int)类似
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
fullTryAcquireShared(Thread) 会判断是否要锁降级,然后根据 “是否需要阻塞等待”,“读锁状态是否超过限制” 等进行处理。如果不需要阻塞等待,并且读锁没有超过限制,则通过 CAS 尝试获取锁,并返回1。
2.2 读锁的释放
与写锁一样,读锁的释放也通过 AQS 的模板方法完成,最终会调用在Sync重写的 tryReleaseShared(int)方法。
rotected final boolean tryReleaseShared(int unused) {
// 获取当前线程对象
Thread current = Thread.currentThread();
// 如果想要释放锁的线程为第一个获取锁的线程
if (firstReader == current) {
// 当前线程仅获取了一次读锁,则需要将firstReader设置null,否则firstReaderHoldCount - 1
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 通过CAS操作更新同步状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
2.3 HoldCounter
读锁的获取与释放过程中都用到了 HoldCounter
,它保存了线程持有共享锁(读锁)的数量、重入的数量,主要起着计数器的作用,对读锁的获取与释放操作会更新对应的计数值。若线程获取读锁,则该计数器+1,释放读锁,该计数器-1。只有当线程获取读锁后才能对读锁进行释放、重入操作。
2.4 写锁的获取
写锁是一个支持重进入的排他锁。如果当前线程已经获取了写锁,再次获取锁则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。源码如下:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// 取同步状态state的低16位,写同步状态
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 存在读锁或当前线程不是已获取写锁的线程,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 判断同一线程获取写锁是否超过最大次数,支持可重入
if (w + exclusiveCount(acquires) > MAX_COUNT) //
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
// 此时c=0,读锁和写锁都没有被获取
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 设置占用排它锁的线程是当前线程
setExclusiveOwnerThread(current);
return true;
}
从源代码可以看出,获取写锁的步骤如下:
- 判断同步状态 state 是否为0。如果 state!=0,则表示当前已经有其他线程获取了读锁或写锁,执行步骤2;否则执行步骤5。
- 判断同步状态 state 的低16位(w)是否为0。如果 w==0,说明其他线程获取了读锁,返回 false;如果 w!=0,说明其他线程获取了写锁,执行步骤3。
- 判断获取了写锁的线程是否是当前线程,若不是返回 false,否则执行步骤4。
- 判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程已获取写锁,更新为独占锁),返回 true。
- 此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),如果不需要阻塞,则 CAS 更新同步状态,若 CAS 成功则返回 true,否则返回 false。如果需要阻塞则返回 false。
2.5 写锁的释放
WriteLock 类提供了 unlock() 方法释放写锁,获取写锁的调用过程就不再具体介绍了,写锁的释放最终会调用Sync类的 tryRelease(int) 方法:
protected final boolean tryRelease(int releases) {
// 当前线程不是写锁的持有线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 计算新的同步状态
int nextc = getState() - releases;
// 判断写锁是否完全释放,若是,则将写锁持有线程设置为null
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
// 更新状态
setState(nextc);
return free;
}
写锁的释放与ReentrantLock的释放过程基本类似,每次释放均是减少写状态,当写状态为0时表示写锁已经完全释放了,从而等待的读写线程能够继续访问读写锁,获取同步状态,同时此次写线程的修改对后续的写线程可见。
2.6 重入锁
可重入锁,就是说一个线程在获取某个锁后,还可以继续获取该锁,即允许一个线程多次获取同一个锁。一个线程获取多少次锁,就必须释放多少次锁。
2.7 锁降级
ReentrantReadWriteLock 支持锁降级,当一个线程获取了写锁之后,也可以获取读锁。但不支持锁升级,线程从读锁升级成写锁是不允许的。
3. 使用示例
public class ReadWriteLockDemo {
private static Map<String, Object> map = new HashMap<>();
private static ExecutorService executorService = Executors.newFixedThreadPool(20);
private static ReadWriteLock lock = new ReentrantReadWriteLock();
public static void main(String[] args) throws InterruptedException {
for (int i = 1; i <= 20; i++) {
executorService.execute(new MyThread());
Thread.sleep(1000);
}
}
static class MyThread implements Runnable {
@Override
public void run() {
int random = new Random().nextInt();
try {
lock.writeLock().lock();
map.put("name" + random, "boo" + random);
lock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + ":writeLock");
lock.readLock().lock();
lock.readLock().lock();
map.get("name" + random);
} finally {
System.out.println(Thread.currentThread().getName() + ":readLock");
lock.writeLock().unlock();
lock.writeLock().unlock();
System.out.println(Thread.currentThread().getName() + ":writeUnlock");
lock.readLock().unlock();
lock.readLock().unlock();
System.out.println(Thread.currentThread().getName() + ":readUnlock");
}
}
}
}
网友评论