1 简介
重入锁ReentrantLock是一个排他锁,所谓排他锁即在同一时刻仅有一个线程可以进行访问,但是在大多数场景下,大部分时间都是提供读服务,而写服务占有的时间较少。然而读服务不存在数据竞争问题,如果一个线程在读时禁止其他线程读势必会导致性能降低。针对这种读多写少的情况,java还提供了另外一个实现----ReentrantReadWriteLock
锁的机制
ReentrantReadWriteLock名称为"读写锁",所谓读写锁即多线程操作中读-读 " 不互斥," 读-写 " 互斥," 写-写 " 互斥。
主要特性
- 公平性:支持公平性和非公平性。所谓公平表示在获取锁时逻辑是否要考虑当前正在排队等待线程。按照大白话来说就时公平表示不能插入强占资源。
- 可重入:获取锁的线程可以在次获取锁。需要注意是如果一个线程多次获取锁,但只释放一次锁。那么此锁还被当前线程占用。
@Test
public void testReenter(){
ReentrantLock reentrantLock = new ReentrantLock();
reentrantLock.lock();
reentrantLock.lock();
reentrantLock.unlock();
//如果不执行当前代码,锁依旧没有被当前线程释放
reentrantLock.unlock();
}
}
- 锁降级:遵循获取写锁、获取读锁在释放写锁的次序,写锁能够降级成为读锁
2 如何实现一个读写锁
想要实现一个读写锁首先要使用两把锁来解决问题,一个读锁(readLock),一个写锁(writeLock)。同时需要设计两个变量来分别保存,读锁被多个线程重入次数总和。以及写锁的重入次数作为判断能否获取锁的依据。
如何获取读锁
按照上面读写锁的机制,想要获取读锁需要进行如下判断:
1 当前资源并没有被写锁占用。
2 读写锁可重入且支持锁降级,如果当前资源并没有被写锁占用,但占用锁的线程是当前线程则依然可以获取读锁。
如何获取写锁
按照上面读写锁的机制,想要获取写锁需要进行如下判断:
1 当前资源没有被读锁占用,
2 如果条件1成立还需要判断当前资源没有被写锁占用。
同步队列
当线程获取锁失败需要将线程信息保留在一个同步队列,方便锁释放的时候通知同步队列中等待的线程获取锁。
3 ReentrantReadWriteLock源码解析
ReentrantLock 使用AQS实现锁机制来实现读写锁。AQS是AbstractQueuedSynchronizer的缩写,翻译过来就是"同步器",,它实现了Java函数中锁同步(synchronized),锁等待(wait,notify)功能。AbstractQueuedSynchronizer是一个抽象类,我们可以编写自己类继承AQS选择重写独占式或共享式模板方法,从而定义如何获取同步状态和释放同步状态的逻辑。
3.1 AQS 实现原理
AQS核心是一个同步状态,两个队列。它们实现了Java函数中锁同步(synchronized),锁等待(wait,notify),并在其基础上实现了独占式同步,共享式同步2中方式锁的实现。
无论独占式还时共享式获取同步状态成功则直接返回,失败则进入CLH同步队列并阻塞当前线程。当获取同步状态线程释放同步状态,AQS会选择从CLH队列head头部节点的第一个节点释放阻塞,尝试重写竞争获取同步状态,如果成功则将当前节点出队。如果失败则继续阻塞。
获取同步状态的线程也可以使用condition对象释放同步状态进入等待队列。只有等待其他线程使用condition.signal或condition.signAll()唤醒被从阻塞状态中释放重新竞争获取同步状态成功后从原来指令位置继续运行。
3.1.1 同步状态
AQS实现了锁,必然存在一个竞争资源。AQS存在从一个int类型的成员变量state,我们把它称为同步状态,同步状态通常用做判断线程能否获取锁的依据
3.1.2 同步队列
AQS 实现了锁那么总需要一个队列将无法获取锁的线程保存起来,方便在锁释放时通知队列中线程去重新竞争锁。
实现原理
同步队列又被称为CLH同步队列,CLH队列是通过链式方式实现FIFO双向队列。当线程获取同步状态失败时,AQS则会将当前线程构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态被释放时,会把首节点后第一个节点的线程从阻塞状态下唤醒,唤醒的线程会尝试竞争同步状态,如果能获取同步状态成功,则从同步队列中出队。
3.1.3 Condition & 等待队列
-
Java 传统的监视器有如下函数 wait、notify、notifyAll。它们可以实现当一个线程获取锁时,它可以主动放弃锁进入一个条件队列中。只有其他线程通知时才从条件队列中出队,重新获取锁成功后继续执行之前的未完成代码逻辑。
-
AQS内部存在一个内部类实现了Condition接口,其内部维护着一条链式实现单向等待队列。我们可以使用AQS获取内部实现Condition接口对象,调用await(),signal(),signalAll()函数实现Java中wait、notify、notifyAll同样功能。
实现原理
- 当获取同步状态的线程调用condition.await(),则会阻塞,并进入一个等待队列,释放同步状态.
- 当其他线程调用了condition.signal()方法,会从等待队列firstWaiter开始选择第一个等待状态不是取消的节点.添加到同步队列尾部.
- 当其他线程调用了condition.signalAll()方法,会从等待队列firstWaiter开始选择所有等待状态不是取消的节点.添加到同步队列尾部.
这里取消节点表示当前节点的线程不在参与排队获取锁。
image3.1.4 独占式同步
从概念上来说独占式对应只存在一个资源,且只能被一个线程或者说竞争者占用.
3.1.5 共享式同步
从概念上来说共享式对应存在多个资源的是有多个线程或者竞争者能够获取占用.
3.2 模板方法
我们可以编写自己类继承AQS选择重写独占式或共享式模板方法,从而定义如何获取同步状态和释放同步状态的逻辑。
3.2.1 独占式
tryAcquire:尝试独占式获取同步状态,返回值为true则表示获取成功,否则获取失败。
tryRelease:
尝试独占式释放同步状态,返回值为true则表示获取成功,否则获取失败。
3.2.2 共享式
tryAcquireShared:尝试共享式获取同步状态,当返回值为大于等于0的时获得同步状态成功,否则获取失败。
tryReleaseShared:尝试共享式释放同步状态,返回值为true则表示获取成功,否则获取失败。
3.3 ReentrantReadWriteLock实现原理
3.3.1 类结构
ReentrantReadWriteLock内部存在有五个内部类 Sync、NonfairSync, FairSync ,ReadLock,WriteLock。
- Sync 继承实现 AbstractQueuedSynchronizer 抽象类。
- NonfairSync(非公平锁) 继承 Sync 抽象类。
- FairSync(公平锁) 继承 Sync 抽象类。
- ReadLock 读锁实现类,大部分功能使用Sync完成。
- WriteLock 写锁实现类,大部分功能使用Sync完成
源码
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
/** 内部类提供readlock */
private final ReentrantReadWriteLock.ReadLock readerLock;
/** 内部类提供writelock */
private final ReentrantReadWriteLock.WriteLock writerLock;
/** 内部定义同步器实现类 */
final Sync sync;
/**
* 默认实例化非公平的{@code ReentrantReadWriteLock}
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* 实例化{@code ReentrantReadWriteLock} 指定是否公平
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/** 获取内部写锁 **/
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
/** 获取内部读锁 **/
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
/**
* ReentrantReadWriteLock的同步器实现。
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/**
* 省略其余源代码
*/
}
/**
* 非公平Sync
* 获取读锁写锁要不用考虑等待队列是否存在等待线程,相当于可以插队,
* 特别需要注意的是如果获取读锁,要判断同步队列首节点是否是写等待节点,如果是获取读锁也失败,
* 这里可以看出写优先级大于读。
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false;
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
/**
* 公平Sync,
* 获取读锁写锁要考虑等待队列是否存在等待线程如果存在获取锁失败,相当于不能插队
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
/**
* 读锁实现类
*/
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.releaseShared(1);
}
public Condition newCondition() {
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
/**
* 写锁实现类
*/
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock( ) {
return sync.tryWriteLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}
3.3.2 Sync核心属性
3.3.2.1 同步状态
AQS 使用 state 保存同步状态,但按照上面的设计需要设计两个变量来分别保存,读锁被多个线程重入次数总和。以及写锁的重入次数作为判断能否获取锁的依据。因此这里我们可以使用state 这个int 类型(4 个字节) 高 16 位 用作读锁被多个线程重入次数总和,低 16 位 用作写锁的重入次数。
[图片上传失败...(image-d32f9c-1567001003194)]
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
/** 高16位为读锁,低16位为写锁 **/
static final int SHARED_SHIFT = 16;
/** 读锁操作每次变更单位 **/
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
/** 有读锁的线程重入总和最大限制次数 **/
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
/** 写锁的重入最大限制次数 **/
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** 返回同步状态的高16位,表示持有读锁的线程重入总和 */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** 返回同步状态的低16位,表示写锁的重入次数 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
3.3.2.2 绑定当前线程重入计数器。
static final class HoldCounter {
int count = 0;
final long tid = Thread.currentThread().getId();
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
3.3.2.3 最后一个获取到读锁线程重入计数器
每当有新的线程获取到读锁,这个变量都会更新。这个变量的目的是:当最后一个获取读锁的线程重复获取读锁,或者释放读锁,就会直接使用这个变量,速度更快,相当于缓存。
/**
* 用来记录最后一个获取到读锁线程重入计数器,
*/
private transient HoldCounter cachedHoldCounter;
3.3.2.4 第一个获取到读锁线程和重入数量
/**
* 表示获取读锁的第一个线程。如果只有一个线程获取读锁,很明显,使用这样一个变量速度更快。
*/
private transient Thread firstReader = null;
/** firstReader 的重入计数器 **/
private transient int firstReaderHoldCount;
4 Sync核心方法
4.1 获取独占锁
实现AQS模板方法
/**
* 获取独占同步状态
*/
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
/** 获取同步状态 **/
int c = getState();
/** 获取写锁被占用次数(可重入) **/
int w = exclusiveCount(c);
/** 如果c!=0表示存在线程获取共享锁会独占锁 **/
if (c != 0) {
/**
* 如果写锁被占用次数为0,说明存在线程获取共享锁,之后判断获取独占锁的线程是否是当前线程,不是返回false,获取独占锁失败
**/
if (w == 0 || current != getExclusiveOwnerThread())
return false;
/** 判断读写锁被占用的次数是否超过MAX_COUNT**/
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
/** 到这里说明是当前线程获取独占锁,并在做重入动作,设置同步状态+acquires**/
setState(c + acquires);
return true;
}
/** 到这里说明c=0,表示当前不存在线程获取共享锁或独占锁。 **/
/**
* writerShouldBlock()是模板方法,不同子类实现不同,分为公平锁,非公平锁
* 公平锁,判断同步队列中是否存在等待线程,存在返回false
* 非公平锁,不会判断同步队列中是否存在等待线程,直接获取锁返回true
* **/
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
4.2 释放独占锁
/**
* 释放独占同步状态
*/
protected final boolean tryRelease(int releases) {
/** 如果当前线程不是获取独占锁的线程,抛出异常 **/
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
/** 获取跟新的同步状态 **/
int nextc = getState() - releases;
/** 判断是否不存在获取读锁的线程**/
boolean free = exclusiveCount(nextc) == 0;
/** 如果不存在获取读锁线程,释放独占锁,返回true**/
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
4.3 获取共享锁
protected final int tryAcquireShared(int unused) {
/** 获取当前线程 **/
Thread current = Thread.currentThread();
/** 获取同步状态 **/
int c = getState();
/**
* 如果写锁的重入次数!=0 说明存在线程获取写锁,则判断当前线程是否是获取写锁的线程,这里说明可以锁降级
* 如果不是则获取读锁失败
* **/
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
/** 返回同步状态的高16位,表示持有读锁线程重入总数 **/
int r = sharedCount(c);
/**
* readerShouldBlock()是模板方法,不同子类实现不同,分为公平锁,非公平锁
* 如果使用公平锁判断同步队列是否存在等待线程,不存在获取读锁成功,之后重新设置同步状态。进入if语句
* 如果使用非公平锁判断同步同步队列中首节点是否是独占节点,如果不是获取读锁成功,之后重新设置同步状态。进入if语句
* **/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
/**
* 读锁线程重入总数为0,使用firstReader记录当前线程为第一个获取读锁的线程,同时使用firstReaderHoldCount记录初始重入次数为1
* **/
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
}
/**
* 如果当前线程是获取读锁的第一个线程设置firstReaderHoldCount重入次数+1 **/
else if (firstReader == current) {
firstReaderHoldCount++;
}
/**
* 判断当前线程是否是最后一个获取读锁的线程,
* 如果是直接跟新cachedHoldCounter重入次数记录器数量+1
* 如果不是获取当前线程ThreadLocal中重入次数记录器跟新cachedHoldCounter,同时将重入次数+1
* **/
else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
/** 获取读锁CAS失败,放到循环里重试。 **/
return fullTryAcquireShared(current);
}
4.4 释放共享锁
protected final boolean tryReleaseShared(int unused) {
/** 获取当前线程 **/
Thread current = Thread.currentThread();
/** 当前线程为第一个获取读锁线程**/
if (firstReader == current) { //
/** 读线程重入次数为1,设置firstReader指向null **/
if (firstReaderHoldCount == 1){
firstReader = null;
}
/** 重入次数>1 则重入次数-1**/
else {
firstReaderHoldCount--;
}
}
/** 当前线程不是第一个获取读锁线程**/
else {
/** 获取最后一个获取到读锁的重入计数器 **/
HoldCounter rh = cachedHoldCounter;
/** 计数器为空或者计数器的tid不为当前正在运行的线程的tid**/
if (rh == null || rh.tid != getThreadId(current)) //
/** 获取当前线程对应的计数器 **/
rh = readHolds.get();
/** 获取计数 **/
int count = rh.count;
/** 计数小于等于1**/
if (count <= 1) {
/** 移除 **/
readHolds.remove();
/** 计数小于等于0,抛出异常 **/
if (count <= 0)
throw unmatchedUnlockException();
}
/** 减少计数 **/
--rh.count;
}
/** 无限循环 **/
for (;;) {
/** 获取同步状态**/
int c = getState();
/** 更新同步状态**/
int nextc = c - SHARED_UNIT;
/** 使用CAS色湖之同步状态**/
if (compareAndSetState(c, nextc))
/** 只有所有共享锁被释放才会同步队列中等待线程取获取锁 **/
return nextc == 0;
}
}
网友评论