前面我们学习了ReentrantLock,其底层就是用了AQS实现的,应该先讲这一章节,但是当时给忘了,现在给补上吧。
关于ReentrantLock的学习,可以参考:https://www.jianshu.com/p/edec5185196d
AbstractQueuedSynchronizer是阻塞式锁,以及同步器组件的实现框架。是JDK中实现并发编程的核心,它提供了一个基于FIFO队列,平时我们工作中经常用到的ReentrantLock,CountDownLatch等都是基于它来实现的。
一、初识AQS
首先我们还是从前面学习的ReentrantLock入手,看看其代码结构是什么样的:
ReentrantLockl类图从上图可以看到以下几点:
- ReentrantLock实现接口Lock(抽象接口)
- ReentrantLock有三个内部类,分别是FrairSync、NonfairSync、Sync,且FrairSync、NonfairSync继承自Sync。
- Sync继承AbstractQueuedSynchronizer
- AbstractQueuedSynchronizer有两个内部类,分别是Node、ConditionObject。
- AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer(抽象类,提供独占线程的get和set)。
AQS有如下的特点:
-
用 state 属性来表示资源的状态,包含独占状态和共享状态,对应公平锁和非公平锁。子类需要定义如何维护这个状态,控制如何获取锁和释放锁,如上面图中的关系,公平锁和非公平锁需要各自去维护这个state,达到获取和释放锁的目的。
- getState - 获取 state 状态
- setState - 设置 state 状态
- compareAndSetState - 使用CAS设置状态
- 独占模式:只有一个线程能够访问资源
- 共享模式:允许多个线程访问资源
-
提供了基于 FIFO 的等待队列,类似于前面讲Synchronized原理提到的 Monitor 的 EntryList
-
使用条件变量来实现等待队列、线程唤醒机制,同时支持多个条件变量,类似于前面讲Synchronized原理提到的 Monitor 的 WaitSet
-
【公平锁】与【非公平锁】:二者的区别主要在于获取锁是否和排队顺序有关。当锁呗一个线程持有,其他尝试获取锁的线程会被挂起,加到等待队列中,先被挂起的在队列的最前端。当锁被释放,需要通知队列中的线程。作为公平锁,会先唤醒队列最前端的线程;而非公平锁会唤醒所有线程,通过竞争去获取锁,后来的线程有可能获得锁。
二、 源码分析
下面我们通过源码剖析其本质是什么样的。
首先在脑海中有个印象,AQS维护了两个对个队列,一个是同步队列,一个是阻塞队列。
Node可以说是【同步队列】和【阻塞队列】的节点。
2.1 Node源码剖析
static final class Node {
// 模式,分为共享与独占
// 共享模式
static final Node SHARED = new Node();
// 独占模式
static final Node EXCLUSIVE = null;
// 结点状态
// CANCELLED,值为1,表示当前的线程被取消
// SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,也就是unpark
// CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
// PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 结点状态
volatile int waitStatus;
// 前驱结点
volatile Node prev;
// 后继结点
volatile Node next;
// 结点所对应的线程
volatile Thread thread;
// 下一个等待者
Node nextWaiter;
// 结点是否在共享模式下等待
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取前驱结点,若前驱结点为空,抛出异常
final Node predecessor() throws NullPointerException {
// 保存前驱结点
Node p = prev;
if (p == null) // 前驱结点为空,抛出异常
throw new NullPointerException();
else // 前驱结点不为空,返回
return p;
}
// 无参构造函数
Node() { // Used to establish initial head or SHARED marker
}
// 构造函数,被addWaiter使用
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
// 构造函数
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
2.2 ConditionObject源码剖析
实现了condition接口,关于condition的学习后面会介绍,在学习ReentrantLock时也介绍了其使用方式。
代码较多,直接从上向下看吧:
// 内部类
public class ConditionObject implements Condition, java.io.Serializable {
// 版本号
private static final long serialVersionUID = 1173984872572414699L;
// condition队列的头结点
private transient Node firstWaiter;
// condition队列的尾结点
private transient Node lastWaiter;
/**
* 构造函数
*/
public ConditionObject() { }
/**
* 添加新的waiter到wait队列
*/
private Node addConditionWaiter() {
// 定义尾结点是t
Node t = lastWaiter;
// 尾结点不为空,并且尾结点的状态不为CONDITION(默认是-2,表示当前节点在conditionObject等待队列中)
if (t != null && t.waitStatus != Node.CONDITION) {
// 清除状态不为CONDITION的结点,对firstWaiter和lastWaiter重新赋值
unlinkCancelledWaiters();
// 将最后一个结点重新赋值给t
t = lastWaiter;
}
// 新建一个结点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 尾结点为空
if (t == null)
// 设置condition队列的头结点
firstWaiter = node;
else
// 设置为节点的nextWaiter域为node结点
t.nextWaiter = node;
// 更新condition队列的尾结点
lastWaiter = node;
return node;
}
/**
* 移除或转移头结点到sync队列,直到没有取消的或者空为止
*/
private void doSignal(Node first) {
// 循环
do {
// 将下一个节点设为首节点,如果为空
if ( (firstWaiter = first.nextWaiter) == null)
// 设置尾结点为空
lastWaiter = null;
// 设置first结点的nextWaiter域
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null); // 将结点从condition队列转移到sync队列失败并且condition队列中的头结点不为空,一直循环
}
/**
* 转移所有等待队列的节点到同步队列
*/
private void doSignalAll(Node first) {
// condition队列的头结点尾结点都设置为空
lastWaiter = firstWaiter = null;
// 循环
do {
// 获取first结点的nextWaiter域结点
Node next = first.nextWaiter;
// 设置first结点的nextWaiter域为空
first.nextWaiter = null;
// 将first结点从condition队列转移到sync队列
transferForSignal(first);
// 重新设置first
first = next;
} while (first != null);
}
/**
* 过滤掉状态不为CONDITION的节点
* 对firstWaiter和lastWaiter重新赋值
**/
private void unlinkCancelledWaiters() {
// 获取condition队列头结点
Node t = firstWaiter;
// 获取一个尾结点是null
Node trail = null;
while (t != null) {
// 获取下一个结点
Node next = t.nextWaiter;
// 头结点的状态不为CONDTION状态
if (t.waitStatus != Node.CONDITION) {
// 设置t节点的下一个等待者为空
t.nextWaiter = null;
if (trail == null) // trail为空,首次进来一定为空
// 重新设置condition队列的头结点
firstWaiter = next;
else
// 设置trail结点的nextWaiter域为next结点
trail.nextWaiter = next;
if (next == null) // next结点为空
// 设置condition队列的尾结点
lastWaiter = trail;
}
else // t结点的状态为CONDTION状态
// 设置trail结点
trail = t;
// 设置t结点
t = next;
}
}
/**
* 实现Condition接口的signal方法
*/
public final void signal() {
if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
throw new IllegalMonitorStateException();
// 保存condition队列头结点
Node first = firstWaiter;
if (first != null) // 头结点不为空
// 唤醒一个等待线程,将头结点从阻塞队列移除,添加到同步队列
doSignal(first);
}
/**
* 实现Condition的signalAll方法,唤醒所有线程
*/
public final void signalAll() {
if (!isHeldExclusively()) // 不被当前线程独占,抛出异常
throw new IllegalMonitorStateException();
// 保存condition队列头结点
Node first = firstWaiter;
if (first != null) // 头结点不为空
// 唤醒所有等待线程,将头结点从阻塞队列移除,添加到同步队列
doSignalAll(first);
}
/**
* 与await()区别在于,使用await方法,调用interrupt()中断后会报错,而该方法不会报错。
*/
public final void awaitUninterruptibly() {
// 添加一个结点到等待队列
Node node = addConditionWaiter();
// 获取释放的状态
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) { //
// 阻塞当前线程
LockSupport.park(this);
if (Thread.interrupted()) // 当前线程被中断
// 设置interrupted状态
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted) //
selfInterrupt();
}
/**
* 等待,当前线程在接到信号或被中断之前一直处于等待状态
*/
public final void await() throws InterruptedException {
// 当前线程被中断,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
// 将当前线程包装成Node,尾插入到等待队列中
Node node = addConditionWaiter();
// 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 当前线程进入到等待状态
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检查结点等待时的中断类型
break;
}
// 自旋等待获取到同步状态(即获取到lock)
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 处理被中断的情况
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
/**
* 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态
*/
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime();
}
/**
* 等待,当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
*/
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() > abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
/**
* 等待,当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。此方法在行为上等
* 效于:awaitNanos(unit.toNanos(time)) > 0
*/
public final boolean await(long time, TimeUnit unit)
throws InterruptedException {
long nanosTimeout = unit.toNanos(time);
if (Thread.interrupted())
throw new InterruptedException();
// 1. 将当前线程包装成Node,尾插入到等待队列中
Node node = addConditionWaiter();
// 2. 释放当前线程所占用的lock,在释放的过程中会唤醒同步队列中的下一个节点
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
timedout = transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
2.3 锁的获取和释放
整个AQS的设计理念就是通过state字段来实现锁的获取和释放,锁有主要分为公平锁和非公平锁。
2.3.1 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
// 继承自AQS的方法,内部先调用tryAcquire获取锁,获取失败则添加下城到等待队列当中
acquire(1);
}
/**
* 公平锁版本的tryAcquire
*/
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取锁的状态
int c = getState();
// 0表示锁没有被持有
if (c == 0) {
// 判断当前等待队列是否有节点在等待,没有才去竞争
if (!hasQueuedPredecessors() &&
// 比较并替换状态
compareAndSetState(0, acquires)) {
// 设置当前线程为独占线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 此处表示锁重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
2.3.2 非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* 立即获取锁,失败会加入等待队列
*/
final void lock() {
// 通过CAS尝试获取锁,成功则设置当前线程独占
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 否则加入等待队列·····································································································································
acquire(1);
}
/**
* 非公平锁版本的tryAcquire
*/
protected final boolean tryAcquire(int acquires) {
// 走其父类Sync的默认nonfairTryAcquire
return nonfairTryAcquire(acquires);
}
}
2.3.3 Syc子类
这是公平锁和非公平锁的父类,提供统一的tryRelease方法释放锁
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* 提供非公平版本的快捷路径
*/
abstract void lock();
/**
* 非公平锁获取,默认就是非公平锁
*/
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 获取当前锁的状态
int c = getState();
// 0表示没有被占用
if (c == 0) {
// CAS占用,成功则设置当前线程为独占锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 当前线程是独占锁,表示锁重入
else if (current == getExclusiveOwnerThread()) {
// 状态 + 1
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置当前状态
setState(nextc);
return true;
}
return false;
}
/**
* 释放锁
*/
protected final boolean tryRelease(int releases) {
// 当前状态 减去 释放的数量
int c = getState() - releases;
// 如果当前线程不是占有锁的线程,抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 当全部释放后,状态为0,取消独占线程
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 设置状态为0,返回释放成功
setState(c);
return free;
}
protected final boolean isHeldExclusively() {
// 当前线程是否是锁持有者
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 获取当前持有者
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 获取持有数,只有是线程持有者才能获取
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
}
3.3.4 acquire 和 release
在AQS当中还有两个核心方法:
- acquire():获取锁,这是实际锁调用上锁的真正方法,前面的try开头的知识尝试获取锁,即使失败也不会添加到等待队列。
public final void acquire(int arg) {
// 尝试获取
if (!tryAcquire(arg) &&
// 尝试获取成功,以独占方式添加到等待队列,并不断地尝试占有锁知道成功
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- release():释放锁,这是实际释放锁的方法,会调用锁自定义的同步器实现的tryRelease方法:
/** * 尝试释放,成功后返回true */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
后面我们自定义不可重入锁,来看看同步器和锁的关系是什么样的,加深理解。
2.4 简单总结
到此为止,通过阅读前面的源码内容,我们可以有如下的总结:
- 锁的释放和获取都是围绕 【state】来做的,0表示未持有锁,1表示独占,大于一表示锁重入
- 获取锁的姿势如下:
// 如果获取锁失败 if (!tryAcquire(arg)) { // 入队, 可以选择阻塞当前线程 park unpark }
- 释放锁的姿势如下:
// 如果获取锁成功 if (!tryRelease(arg)) { // 让阻塞线程恢复运行 }
三、实践
了解了AQS的结构之后,我们不妨自己动手实践一番。加深理解。
定义一个不可重入锁,需要一个同步器,一个锁,一个测试类
自定义同步器:
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* @description: 实现一个不可重入锁 同步器,state最大只能是1
* @author:weirx
* @date:2022/1/13 13:49
* @version:3.0
*/
public class MyLockSynchronizer extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int acquires) {
int state = getState();
if (state == 0) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
}
return false;
}
@Override
protected boolean tryRelease(int acquires) {
int c = getState() - acquires;
if (c == 0) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
return false;
}
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
protected ConditionObject newCondition() {
return new ConditionObject();
}
}
自定义锁:
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @description: 自定义锁
* @author:weirx
* @date:2022/1/13 14:05
* @version:3.0
*/
public class MyLock implements Lock {
MyLockSynchronizer myLockSynchronizer = new MyLockSynchronizer();
@Override
public void lock() {
// 尝试获取锁,失败则加入等待队列
myLockSynchronizer.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
// 尝试获取锁,失败则加入等待队列, 可中断
myLockSynchronizer.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
// 尝试获取锁,不加入等待队列
return myLockSynchronizer.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
// 尝试获取锁,不加入等待队列,有实现
return myLockSynchronizer.tryAcquireNanos(1, unit.toNanos(time));
}
@Override
public void unlock() {
// 释放锁
myLockSynchronizer.release(1);
}
@Override
public Condition newCondition() {
// 条件变量
return myLockSynchronizer.newCondition();
}
}
测试锁的效果:
/**
* @description: 测试
* @author:weirx
* @date:2022/1/13 14:24
* @version:3.0
*/
public class TestMyLock {
public static void main(String[] args) {
MyLock myLock = new MyLock();
new Thread(() -> {
try {
myLock.lock();
System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
+ " " + Thread.currentThread().getName() + " :acquire lock success");
// 休眠一秒看效果
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
myLock.unlock();
System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
+ " " + Thread.currentThread().getName() + " :release lock success");
}
}, "t1").start();
new Thread(() -> {
try {
myLock.lock();
System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
+ " " + Thread.currentThread().getName() + " :acquire lock success");
} finally {
myLock.unlock();
System.out.println(DateUtil.format(new Date(), "yyyy-MM-dd HH:mm:ss")
+ " " + Thread.currentThread().getName() + " :release lock success");
}
}, "t2").start();
}
}
结果,ti一秒后释放锁,才会由t2获得锁:
2022-01-13 14:34:56 t1 :acquire lock success
2022-01-13 14:34:57 t2 :acquire lock success
2022-01-13 14:34:57 t1 :release lock success
2022-01-13 14:34:57 t2 :release lock success
测试下不可重入是否好使,只需要在上述测试代码的线程t1中,再次使用myLock.lock()获取一次锁,发现,整个程序被卡住了,只会打印一条信息:
2022-01-13 14:35:56 t1 :acquire lock success
四、关于Condition的补充
本篇没有介绍Condition的具体内容,但是在之前讲解ReentrantLock提到过【条件变量】,可以返回去看这篇文章了解其用法:https://www.jianshu.com/p/edec5185196d
源码学习真是难,看别人说的再多不如自己跟着走一遍,建议同学们参照本文自己跟踪一遍。
网友评论