1 前言
锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能防止多个线程同时访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,如读写锁)。在以前,Java程序是靠synchronized来实现锁功能的,而在Java SE 5之后,并发包中新增了Lock接口(以及相关实现类)用来实现锁功能,他提供了与synchronized关键字类似的同步功能,只是在使用时需要显式的获取锁和释放锁,虽然它缺少了synchronized提供的隐式获取释放锁的便捷性,但是却拥有了锁获取和释放的可操作性、可中断的获取锁以及超时获取锁等多种synchronized关键字不具备的同步特性。很多锁都通过实现Lock接口来完成对锁的操作,比如可重入锁(ReentrantLock)、前一张讲的Redisson分布式锁等,而Lock接口的实现,基本是都是通过聚合了一个同步器的子类来完成线程访问控制的,而同步器,就是我们常说的AQS(AbstractQueuedSynchronizer),也是今天要记录的内容。
2 什么是AQS
AQS(队列同步器AbstractQueuedSynchronizer)是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
如上图所示,同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行修改,这时就需要使用同步器提供的3个方法来进行操作:
1、getState():获取当前同步状态
2、setState():设置当前同步状态
3、compareAndSetState(int expect, int update):通过CAS设置当前状态,该方法能保证状态设置的原子性
子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用。同步器既可以支持独占式获取同步状态(单个线程获取锁),也可以支持共享式获取同步状态(多个线程获取到锁),这样就可以方便实现不同类型的同步组件(如上图所示的可重入锁:ReentrantLock、可重入读写锁:ReentrantReadWriteLock、计数器:CountDownLatch等等)
3 同步器可重写的方法
以下代码为可重入锁继承同步器后重写的方法:
- protected boolean tryAcquire(int acquires):独占式获取同步锁状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态。
/**
* 非公平锁
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
// nonfairTryAcquire方法和下面的公平锁方法除了判断是否在队列首位之外没有不同
return nonfairTryAcquire(acquires);
}
}
/**
* 公平锁
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取当前同步状态
int c = getState();
// 判断是否符合预期
if (c == 0) {
// 判断是否在队列首位并且CAS设置当前状态成功
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
- protected boolean tryRelease(int acquires):独占式释放同步锁状态,等待获取同步状态的线程将有机会获取同步状态。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
- protected boolean isHeldExclusively():当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程****锁****独占。
protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}
以下代码为读写锁继承同步器后重写的方法:
- protected int tryAcquireShared(int arg):共享式获取同步状态,返回大于0的值,表示获取锁成功,反之获取锁失败。
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 获取当前同步状态
int c = getState();
// 如果有线程持有写锁,则返回-1
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取持有锁的线程数
int r = sharedCount(c);
// 判断是否阻塞,判断线程数量,判断CAS设置是否成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 当前线程是第一个获取到锁的线程
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 否则就++
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 死循环去获取锁
return fullTryAcquireShared(current);
}
- protected boolean tryReleaseShared(int arg):共享式释放同步状态。
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 对应上面获取锁来读就好了
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
上面例子,因为读写锁是共享锁,可重入锁是独占锁,而同步器对于共享锁和独占锁都提供了可重写的方法来获取锁或者释放锁,所以分了两个例子来写。
4 同步器提供的模版方法
- void acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功则返回,否则,将会进入同步队列等待。
public final void acquire(int arg) {
// 如果获取锁失败,则加入同步队列,如果加入同步队列成功则自旋阻塞唤醒来不断的尝试获取锁,直到线程被中断或获取到锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- void acquireInterruptibly(int arg):与上面acquire相似,但是当前方法如果在获取锁的过程中线程中断会抛出InterruptedException并返回。
public final void acquireInterruptibly(int arg) throws InterruptedException {
// 线程中断抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 如果没有获取到锁
if (!tryAcquire(arg))
// 不断自旋尝试获取锁
doAcquireInterruptibly(arg);
}
- boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly()方法的基础上增加了超时时间,如果在超时时间内获取到了锁,则返回true,否则返回false。
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
// 中断抛异常
if (Thread.interrupted())
throw new InterruptedException();
// 相应时间内不断获取锁,超时返回false
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
- void acquireShared(int arg):共享式的获取同步状态,如果当前线程未获取到同步状态,则会进入同步队列等待,与独占锁的主要区别是在同一时刻可以有多少个线程获取到同步状态。
public final void acquireShared(int arg) {
// 如果获取锁失败,则不断自旋尝试获取锁,tryAcquireShared方法在上面有讲
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
- void acquireSharedInterruptibly(int arg):与acquireShared方法相似,只是如果线程中断,当前方法会抛出异常。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
// 中断抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 如果获取锁失败,则不断自旋尝试获取锁
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
- boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly方法的基础上增加了超时时间。
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
// 中断抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 在超时时间内如果获取锁失败,则不断自旋尝试获取锁
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
- boolean release(int arg):独占式释放同步状态,该方法在释放同步状态之后,会将队列中的第一个节点包含的线程唤醒。
public final boolean release(int arg) {
// 如果获取锁成功
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒第一个节点的线程
unparkSuccessor(h);
return true;
}
return false;
}
- boolean releaseShared(int arg):共享式释放同步状态。
public final boolean releaseShared(int arg) {
// 如果释放锁成功
if (tryReleaseShared(arg)) {
// 唤醒线程
doReleaseShared();
return true;
}
return false;
}
- Collection getQueuedThreads():获取等待在同步队列上的线程集合。
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
同步器提供的模板方法基本是分为3类:
- 独占式获取与释放同步状态
- 共享式获取与释放同步状态
- 查询同步队列中的等待线程集合
5 根据同步器自定义同步组件
上面介绍了一些AQS提供的可重写方法和模板方法,接下来我们自定义一个独占锁(在同一时刻只有一个线程能获取锁,其他获取锁的线程只能处于同步队列中,当获取到锁的线程释放锁之后,后面的线程才能够获取锁)
public class ExclusiveLock implements Lock {
/**
* 自定义同步器
*/
private static class Sync extends AbstractQueuedSynchronizer{
// 判断是否处于占用状态
@Override
protected boolean isHeldExclusively(){
return getState() == 1;
}
// 加锁
@Override
protected boolean tryAcquire(int arg) {
// 通过CAS设置同步状态(设置成功返回true 设置失败返回false)
if (compareAndSetState(0, 1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁
@SneakyThrows
@Override
protected boolean tryRelease(int arg) {
// 如果同步状态为未获取锁,则抛出异常,没有线程获取到锁,不能释放锁
if (getState() == 0){
throw new IllegalAccessException();
}
// 释放锁
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 返回一个Condition,每个Condition都包含一个Condition队列
protected Condition newCondition() {
return new ConditionObject();
}
}
private final Sync sync = new Sync();
@Override
public void lock() {
// 独占式加锁
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
// 加锁线程中断抛出异常,否则自旋加锁
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
// 加锁成功返回true,否则设置占用排它锁的线程是当前线程
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// 加锁线程中断抛出异常,否则在有效时间内尝试自旋加锁
return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
// 释放锁
sync.release(1);
}
@Override
public Condition newCondition() {
// 返回Condition
return sync.newCondition();
}
}
如上代码,我们自定义了一个独占锁,它在同一时刻只允许一个线程占有锁。sync内部类继承了同步器并实现了独占式获取和释放同步状态。在tryAcquire(int arg)方法中,如果通过CAS设置成功,则代表获取了同步状态,而在tryRelease(int arg)方法中只是将同步状态重制为0。用户在使用ExclusiveLock时并不会直接和内部同步器打交道,而是调用ExclusiveLock提供的方法即可,如加锁调用lock()方法,如果获取锁失败则会被加入同步队列中,释放锁调用unlock()方法,如果没有线程获取锁的时候释放锁会抛出异常,还可以按指定时间尝试获取锁等等。
结尾
本来想把同步器实现原理也写一些的,结果看了一下篇幅好想有些许长,那就分两篇来写把,如果看完感觉有帮助的,请帮忙点个赞,谢谢各位,有缘下篇文章再见!
网友评论