ReentrantReadWriteLock简介
ReentrantReadWriteLock
是读写锁。一般的排他性锁在一个线程执行某个方法获得到锁之后(暂未释放锁),其他线程再想执行同一方法便会阻塞,即使该方法执行的是读操作。而在很多场景下,读多写少,这时如果继续使用排他锁,性能便会大受影响。读写锁的出现正是为了解决此类问题,读写锁内部有读锁和写锁,读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的。读锁和写锁分离从而读写分离提升性能,其主要应用于读多写少的场景。下面来看看ReentrantReadWriteLock
的使用示例
public class ReadWriteLockSample {
private Map<String, Object> map = new HashMap<String, Object>();
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private Lock readLock = readWriteLock.readLock();
private Lock writeLock = readWriteLock.writeLock();
public Object get(String key){
readLock.lock();
try {
try {
Thread.sleep(new Random().nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return map.get(key);
}finally {
readLock.unlock();
System.out.println(Thread.currentThread().getName()+"读操作执行完毕");
}
}
public void put(String key, Object value){
writeLock.lock();
try {
try {
Thread.sleep(new Random().nextInt(3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key,value);
}finally {
writeLock.unlock();
System.out.println(Thread.currentThread().getName()+"写操作执行完毕");
}
}
public static void main(String[] args) {
final ReadWriteLockSample rwl = new ReadWriteLockSample();
rwl.put("key1", "value1");
new Thread(new Runnable() {
@Override
public void run() {
rwl.put("key1", "value1");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(rwl.get("key1"));
}
}).start();
}
}
读写锁允许同时执行多个读操作,但同一时间只能有一个线程在执行写操作。
ReentrantReadWriteLock实现原理
ReentrantReadWriteLock
内部维护一把读锁和写锁,读锁和写锁都是通过定义在其内部的AQS
子类Sync
来实现。和ReentrantLock
一样,读锁和写锁也支持公平锁和非公平锁,默认创建的都是非公平锁。
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
/** 读锁,ReadLock是自定义AQS的子类 */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 写锁,WriteLock是自定义AQS的子类*/
private final ReentrantReadWriteLock.WriteLock writerLock;
/** 自定义AQS的子类 */
final Sync sync;
......
}
读写状态
我们知道在AQS
内部有个volatile
修饰的int变量state来表示同步状态,在读写锁内部也使用了一个int类型的变量state来标识多个读锁和一个写锁的状态。如何用一个int变量来标识呢?ReentrantReadWriteLock
中将32位的int值分成两段,高16位用来标识读,低16位用来标识写。示意图如下
低16位的写状态之所以会大于1,是因为写锁是可重入的,它表示的是写锁重入的次数,而不是当前写锁的个数。
基于这种设计,读锁和写锁的状态获取和设置如下
- 读锁状态的获取:state>>16
- 读锁状态的写入:state+(1<<16),默认加1
- 写锁状态的获取:state & 0x0000FFFF
- 写锁状态的写入:state+1
借助上面的公式,我们再理解ReentrantReadWriteLock
中自定义AQS
维护的几个变量
abstract static class Sync extends AbstractQueuedSynchronizer {
// 按16位来分state变量
static final int SHARED_SHIFT = 16;
// 读锁增加的步长
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 读锁允许的最大数量、写锁可重入的最大次数 65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 写锁的掩码,用于获取state的低16位有效值,该值等于Ox0000FFFF
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
......
}
写锁的获取
在获取写锁的时候,要先判断读锁是否已经被获取以及该线程是否是已经获取写锁的线程,如果读锁已经被获取,或者当前线程不是已经获取写锁的线程,则当前线程进入等待状态。
写锁的获取会调用ReentrantReadWriteLock.WriteLock
类的lock
方法。
public void lock() {
sync.acquire(1);
}
acquire
是AQS
的模板方法,其内部会调用由子类重写的tryAcquire
方法,而该方法在Sync
内部类中重写了,代码如下。
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
// 获取写锁的重入次数
int w = exclusiveCount(c);
// c!=0 表示已经存在线程持有读锁或者是写锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 存在读锁 || 当前线程不是持有写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
// false 表示获取同步资源失败
return false;
//写锁重入的次数超过所允许的最大值(65535)
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 写锁能够成功重入,更新state值
setState(c + acquires);
return true;
}
/*
* writerShouldBlock表示获取写锁的线程是否要阻塞住,在子类NonfairSync直接返回false
* 在子类FairSync中,如果当前线程还有前驱节点则返回true,即阻塞。
*/
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
写锁的释放
释放也会调用AQS
的模板方法release(int arg)
,该模板方法会回调自定义AQS
子类Sync
重写的tryRelease
方法,代码实现如下
protected final boolean tryRelease(int releases) {
// 如果当前调用释放写锁的线程不是持有该写锁的线程,则抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
// 只有当写状态变为0时,才代表释放写锁成功,从而唤醒后续等待线程
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
// 设置state
setState(nextc);
return free;
}
读锁的获取
读锁的释放和获取会用到一个内部类HoldCounter
,定义在Sync
的内部。它在读锁使用中起到重要的作用,表示某个线程持有读锁的数量,它需要和线程绑定。
static final class HoldCounter {
int count = 0;
// 线程的ID
final long tid = Thread.currentThread().getId();
}
HoldCounter
只定义了一个计数器和线程ID,而通过线程ID是无法保证对象和线程之间的绑定关系的。它通过ThreadLocal
实现和线程之间的绑定,代码如下。
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
读锁的获取会调用ReentrantReadWriteLock.ReadLock
类的lock
方法。lock
方法又会调用ReentrantReadWriteLock.Sync
的acquireShared
方法,而acquireShared
是定义在AQS
中的模板方法,该方法会回调子类重写的tryAcquireShared
方法。tryAcquireShared
在内部类ReentrantReadWriteLock.Sync
中进行了重写,实现代码如下
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 有写锁,且获取写锁的线程不是当前线程
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 读锁的数量
int r = sharedCount(c);
//如果读不应该阻塞并且读锁的个数小于最大值65535,并且可以成功更新状态值,则成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果还没有线程持有读锁
if (r == 0) {
// 将当前线程置为firstReader,表示第一个获取读锁的线程
firstReader = current;
// firstReaderHoldCount表示第一个获取读锁的线程重入的次数
firstReaderHoldCount = 1;
// firstReader == current 表示第一个拥有读锁的线程重入
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
//当前读线程和第一个读线程不同,记录每一个线程获取读锁的次数
ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
// 从ThreadLocal中获取HoldCounter对象并赋值给rh
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 将rh设置到ThreadLocal中
readHolds.set(rh);
rh.count++;
}
return 1;
}
//否则,循环尝试
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
ReentrantReadWriteLock.Sync.HoldCounter rh = null;
for (;;) {
int c = getState();
//一旦有别的线程获得了写锁,返回-1,失败
// 这里对应锁降级,若当前线程已持有写锁,则允许当前线程继续获取读锁
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
//如果读线程需要阻塞
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else { //说明有别的读线程占有了锁
if (rh == null) {
// cachedHoldCounter为HoldCounter对象,开始时为空
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId()) {
// 从ThreadLocal中获取HoldCounter对象并赋值给rh
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
//读锁的数量超出最大范围
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//如果成功更改状态,成功返回
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 第一次获取读锁
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
读锁的释放
与写锁一样,读锁的释放也通过AQS
的模板方法完成,最终会调用在Sync
重写的tryReleaseShared
方法。
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {//获取rh对象,并更新当前线程获取读锁的数量
ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
//CAS更新同步状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
锁降级
锁降级指的是写锁降级为读锁。其过程为:在当前线程持有写锁的时候,再获取到读锁,然后将之前持有的写锁释放。
public class ProcessData {
private static final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private static final Lock readLock = rwl.readLock();
private static final Lock writeLock = rwl.writeLock();
private volatile boolean update = false;
public void processData() {
readLock.lock();
if (!update) {
// 必须先释放读锁
readLock.unlock();
// 锁降级从写锁获取到开始
writeLock.lock();
try {
if (!update) {
// 准备数据的流程(略)
update = true;
}
readLock.lock();
} finally {
writeLock.unlock();
}
// 锁降级完成,写锁降级为读锁
}
try {
// 使用数据的流程(略)
} finally {
readLock.unlock();
}
}
}
网友评论