前言
线程并发系列文章:
Java 线程基础
Java 线程状态
Java “优雅”地中断线程-实践篇
Java “优雅”地中断线程-原理篇
真正理解Java Volatile的妙用
Java ThreadLocal你之前了解的可能有误
Java Unsafe/CAS/LockSupport 应用与原理
Java 并发"锁"的本质(一步步实现锁)
Java Synchronized实现互斥之应用与源码初探
Java 对象头分析与使用(Synchronized相关)
Java Synchronized 偏向锁/轻量级锁/重量级锁的演变过程
Java Synchronized 重量级锁原理深入剖析上(互斥篇)
Java Synchronized 重量级锁原理深入剖析下(同步篇)
Java并发之 AQS 深入解析(上)
Java并发之 AQS 深入解析(下)
Java Thread.sleep/Thread.join/Thread.yield/Object.wait/Condition.await 详解
Java 并发之 ReentrantLock 深入分析(与Synchronized区别)
Java 并发之 ReentrantReadWriteLock 深入分析
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(原理篇)
Java Semaphore/CountDownLatch/CyclicBarrier 深入解析(应用篇)
最详细的图文解析Java各种锁(终极篇)
线程池必懂系列
前面几篇分析了synchronized 原理及其使用,synchronized 是JVM实现的,核心代码是C++,对于不熟悉C++语言的读者可能有点难度。JUC 包下提供了新的同步框架:AQS,是纯JAVA代码实现的。如果你了解了synchronized 核心,那么AQS不在话下,若是不了解,本篇将一起从头到尾深入分析AQS。
通过本篇文章,你将了解到:
1、如何实现自己的同步框架
2、AQS 功能解析
3、AQS 独占锁实现
4、AQS 共享锁实现
5、场景模拟与疑难解答
1、如何实现自己的同步框架
准备关键数据结构
第一
得需要共享变量作为"锁芯"。由于这个共享变量是多线程共享,为保证线程间的可见性,因此需要用volatile关键字修饰。
第二
当线程竞争锁成功时则进入临界区执行代码,当失败时需要加入到队列里进行等待,因此需要一个同步队列,用以存放因获取锁失败而挂起的线程。
第三
线程之间需要同步,A线程等待B线程生产数据,B线程生产了数据通知A线程,因此需要一个等待(条件)队列。
核心操作
数据结构准备好之后,需要操作以上数据结构来实现锁功能。
线程竞争锁:通过CAS操作"锁芯",操作成功则执行临界区代码,失败则加入到同步队列。
线程被唤醒:拿到锁的线程执行完临界区代码后释放锁,并唤醒同步队列里等待的线程,被唤醒的线程继续竞争锁。
线程等待某个条件:线程因为某种条件不满足于是加入到等待队列里,释放锁,并挂起等待。
条件满足线程被唤醒:条件满足,线程被唤醒后继续竞争锁。
以上步骤是实现锁功能的核心,不论是synchronized还是AQS,基础功能都是以上步骤,只是他们还拥有更丰富的功能,如可中断(AQS),可重入、可独占可共享(AQS)等。
自己实现锁的实践请移步:Java 并发"锁"的本质(一步步实现锁)
2、AQS 功能解析
AQS是AbstractQueuedSynchronizer的简称,顾名思义:同步器。它是JUC下的同步框架,也是实现锁的核心类。
AQS是抽象类,提供了基础的方法,需要子类实现具体的获取锁、释放锁操作。
image.png
如上图所示,AQS 实现了独占锁/共享锁、可中断锁/不可中断锁的逻辑,当子类扩展AQS时调用对应的方法即可实现不同的锁组合。
JUC下扩展自AQS的子类封装器:
image.png
接下来进入到AQS源码里,看看它是如何实现上述功能的。
注:Semaphore和CountDownLatch 并不是严格意义上的锁,后面具体分析每种锁的时候再细说
3、AQS 独占锁实现
A、先找到关键数据结构
锁芯
#AbstractQueuedSynchronizer.java
private volatile int state;
state 称为共享资源或者同步状态,作为锁的锁芯,可以看出它用volatile修饰了。
同步队列
#AbstractQueuedSynchronizer.java
//指向同步队列的头
private transient volatile Node head;
//指向同步队列的尾
private transient volatile Node tail;
再来看Node里的元素:
#AbstractQueuedSynchronizer.java
static final class Node {
...
//前驱节点
volatile Node prev;
//后继节点
volatile Node next;
//占用独占锁的线程
volatile Thread thread;
//指向下一个等待条件的节点
Node nextWaiter;
...
}
B、操作关键数据结构的方法
1、先说获取同步状态的操作
acquire(xx)
#AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
//arg 不同的锁实现有不同的含义
//tryAcquire 由子类实现具体的获取同步状态操作
//addWaiter 将当前线程封装在Node里并加入到同步队列
//acquireQueued 符合条件则挂起线程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//补上中断标记
selfInterrupt();//-------------(1)
}
里面写法很简单,重要工作都在各个方法里体现。
tryAcquire(xx)
真正获取锁的地方,也就是操作锁芯"state"的地方,不同子类有不一样的实现,后面会分类细说。tryAcquire(xx) 返回true表示获取同步状态成功,false表示获取同步状态失败。
addWaiter(xx)
#AbstractQueuedSynchronizer.java
private Node addWaiter(Node mode) {
//构造新节点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
//尾节点存在
//新节点的前驱指针指向尾节点
node.prev = pred;//-------------(2)
//CAS修改为尾节点指向新节点
if (compareAndSetTail(pred, node)) {
//成功后
//将最后一个节点的后继指针指向新加入的节点
//此时新节点正式加入到同步队列里了
pred.next = node;
return node;
}
}
//前面步骤加入队列失败,则会走到这
enq(node);
//返回新加入的节点
return node;
}
private Node enq(final Node node) {
//死循环务必保证插入队列成功
for (;;) {
Node t = tail;
if (t == null) {
//队列是空的,则先创建头节点
if (compareAndSetHead(new Node()))
//尾节点指向头节点
tail = head;
} else {
//和addWaiter里一样的操作,加入新节点到队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
addWaiter(xx)作用是将节点加入到同步队列的尾部。
需要注意的是:
头节点不关联任何线程,仅仅起到索引的作用。
最终,同步队列如下图:
image.png
acquireQueued(xx)
按照以往的经验(synchronized),加入到同步队列后应该挂起线程,来看看AQS实现有何不同:
#AbstractQueuedSynchronizer.java
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//记录中断标志
boolean interrupted = false;
for (;;) {
//找到当前节点的前驱节点
final Node p = node.predecessor();
//如果前驱节点为头节点,则尝试获取同步状态
if (p == head && tryAcquire(arg)) {
//获取同步状态成功,将头节点指向下一个节点,并且新的头节点prev=null,表示原本的头节点出队了
setHead(node);
//原本的头节点next=null,帮助尽快GC
p.next = null;
failed = false;
return interrupted;
}
//判断获取同步状态失败后是否需要挂起,走到这里说明获取同步状态失败了,可能需要挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//标记中断
interrupted = true;
}
} finally {
if (failed)
//还是没获取成功,则取消竞争同步状态操作
cancelAcquire(node);
}
}
这里面的逻辑可能比较绕,着重分析一下。
首先外层有个死循环,该循环退出的条件是当前线程成功获取了同步状态。
其次,如果当前新加入队列的节点的前驱节点是头节点,那么它就会去尝试获取同步状态。若是获取同步状态失败或者它的前驱节点不是头节点,则进入到shouldParkAfterFailedAcquire(xx)方法。
#AbstractQueuedSynchronizer.java
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//拿出前驱节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//为SIGNAL 直接返回
return true;
if (ws > 0) {
//该节点已被取消
do {
//一直往前追溯,直到找到不被取消的节点为止
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
//找到
pred.next = node;
} else {
//不是取消状态,则直接设置前驱节点状态为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
该方法返回true,则说明当前线程所在节点的前驱节点的状态为:SIGNAL。进而执行parkAndCheckInterrupt()方法。
parkAndCheckInterrupt()
从名字就可以看出来是挂起线程并检查中断。
#AbstractQueuedSynchronizer.java
private final boolean parkAndCheckInterrupt() {
//挂起线程
LockSupport.park(this);
//查询中断标记位
return Thread.interrupted();
}
cancelAcquire(xx)
该方法有两种场景会调用到:
1、当获取同步状态发生异常时,需要取消线程竞争同步状态的操作。
2、当获取同步状态的超时时间到来之时,若此刻还无法成功获取同步状态,则调用该方法。
#AbstractQueuedSynchronizer.java
private void cancelAcquire(Node node) {
if (node == null)
return;
node.thread = null;
Node pred = node.prev;
//若是已经取消了,则跳过
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
//标记为取消状态
node.waitStatus = Node.CANCELLED;
//如果当前节点是尾节点,则将尾节点指向当前节点的前驱节点
if (node == tail && compareAndSetTail(node, pred)) {
//再将前驱节点的后继指针置为空,把node从队列里移除
compareAndSetNext(pred, predNext, null);
} else {//---------------(3)
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
//若前驱节点不是头结点,当前节点也不是尾结点
Node next = node.next;
if (next != null && next.waitStatus <= 0)
//将node的前驱节点的后继指针指向node的后继节点
compareAndSetNext(pred, predNext, next);
} else {
//前驱节点是头结点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
2、再说释放同步状态的操作
release(xx)
#AbstractQueuedSynchronizer.java
public final boolean release(int arg) {
//tryRelease 释放同步状态
if (tryRelease(arg)) {
//释放同步状态成功
Node h = head;
if (h != null && h.waitStatus != 0)
//waitStatus 不为0,则唤醒线程
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
//将头结点状态置为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//取出头结点的后继节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
//若是没有后继节点或者是取消状态
s = null;
//则从尾部开始寻找离头结点最近的未取消的节点-----------(4)
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒线程
if (s != null)
LockSupport.unpark(s.thread);
}
节点状态
#AbstractQueuedSynchronizer.java
//节点被取消,不再参与竞争锁
static final int CANCELLED = 1;
//表示该节点的后继节点需要唤醒
static final int SIGNAL = -1;
//节点在等待队列里的状态
static final int CONDITION = -2;
//表示头结点将唤醒的动作传播下去
static final int PROPAGATE = -3;
//默认值为0
至此,独占锁的获取锁、释放锁的流程已经分析完毕,如下图:
image.png
4、AQS 共享锁实现
独占锁是同一时刻只允许一个线程获取锁,而共享锁则不然,来看看AQS里共享锁的实现。
先来看看获取共享同步状态的操作
acquireShared(xx)
#AbstractQueuedSynchronizer.java
public final void acquireShared(int arg) {
//获取共享的同步状态,不同锁实现不一样
//<0 表示获取同步状态失败
if (tryAcquireShared(arg) < 0)
//加入同步队列、挂起线程等在此处实现
doAcquireShared(arg);
}
与独占锁的获取不一样的是,此处将加入同步队列与挂起线程等操作放到一个方法里了。
doAcquireShared(xx)
#AbstractQueuedSynchronizer.java
private void doAcquireShared(int arg) {
//加入同步队列,此处节点是共享状态
final Node node = addWaiter(Node.SHARED);
//获取同步状态失败
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//前驱节点
final Node p = node.predecessor();
if (p == head) {
//若是前驱节点为头结点,则尝试获取同步状态
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取同步状态成功
//修改头结点,并传递唤醒状态
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
//补中断
selfInterrupt();
failed = false;
return;
}
}
//与独占锁一致
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquireShared(arg) 返回值表示当前可用的资源。
setHeadAndPropagate(xx)
#AbstractQueuedSynchronizer.java
private void setHeadAndPropagate(Node node, int propagate) {
//propagate == 0 表示没有资源可以使用了
Node h = head; // Record old head for check below
//设置头结点
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//若后继节点是共享节点,则唤醒
if (s == null || s.isShared())
doReleaseShared();
}
除了将头结点指向当前节点外,还需要唤醒下一个共享节点。
而独占锁不会。
从现实的角度来看也比较容易理解这种操作:
某个澡堂分男女分批洗,当前只允许女士进去先洗,而一群男士在排队等候,当女士们洗好之后,允许男士进去洗。第一个男士进去后,发现可以洗,于是跟第二个男士说可以洗,你快进来吧,真可以洗,这是共享精神。这个澡堂就是共享的。
再来看看释放共享同步状态的操作
releaseShared(xx)
#AbstractQueuedSynchronizer.java
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
//释放同步状态成功后,通知后续节点
doReleaseShared();
return true;
}
return false;
}
可以看出,线程在获取共享锁和释放共享锁后都会尝试唤醒后续节点,都调用了
doReleaseShared()方法。
doReleaseShared()
#AbstractQueuedSynchronizer.java
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
//队列里还有节点等候
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
//唤醒后继节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//将头节点状态置为PROPAGATE-------->(5)
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//该方法可能会被多个线程调用,而线程获取锁后会修改头节点
//因此,若是发现头结点更改了,则再重新拿新的头结点再试探
if (h == head) // loop if head changed
break;
}
}
至此,共享锁的获取锁、释放锁的流程已经分析完毕,如下图:
5、场景模拟与疑难分析
共享锁、独占锁的实现重要方法、数据结构都过了一遍,接下来通过模拟场景来分析上面提到的五个问题。
1、问:为什么需要selfInterrupt()
#AbstractQueuedSynchronizer.java
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport.park(this)将线程挂起,挂起后调用Thread.interrupted()查询中断状态。而Thread.interrupted()除了查询中断状态外,还会重置中断状态,也就是说之前中断状态为true,调用该方法后中断状态变为false。而从整个acquire(xx)方法来看,没有任何地方处理了中断,因此不能简单将中断状态置为false,还需要恢复到原来的样子,让外部调用者可以感知到是否已经发生过中断了,所以需要selfInterrupt() 重新把中断状态设置为true。
既然后面要恢复中断状态,那干嘛一开始就置为false呢,直接调用Thread.isInterrupted()不就ok了吗?
想象一种场景:A线程被中断,此时从挂起状态醒过来,然后去获取同步状态,发现还是无法获取,于是又开始准备挂起了。此处挂起线程使用的是LockSupport.park(xx)方法,其底层使用Parker.park(xx)函数:
image.png
注意红框里代码,若是发现当前线程中断状态为true,则直接返回不再挂起线程。若是调用Thread.isInterrupted(),中断状态没有改为false,那么当调用LockSupport.park(xx)方法时,线程是无法挂起的。而acquire(xx)方法里没有获取到锁就一直循环,导致线程一直不断轮询同步状态,造成了不必要的CPU资源浪费。
Parker细节请移步:Java Unsafe/CAS/LockSupport 应用与原理
线程中断细节请移步:Java “优雅”地中断线程(原理篇)
2、问:为什么先设置前驱指针
当前的顺序是:
node.prev = pred------>pred.next = node;
先让新结点的prev指向尾结点,再让尾结点的next指向新结点,如下图:
image.png
现在同步队列里有两个结点,其中一个头结点,一个是Node1结点。若是先给pred.next 赋值,假设流程如下:
1、线程A先竞争锁,竞争失败,先将Node1的next指向NewNodeA。
2、此时另一个线程B也来竞争锁,失败,也将Node1的next指向NewNodeB。
3、将tail指针指向新的节点(可能是NewNodeA,也可能是NewNodeB),若是NewNodeA,然后将NewNodeA的prev指向Node1。此时问题出现了:虽然NewNodeA的prev指向了Node1,但是Node1的next却是指向了NewNodeB。
而先给node.prev 赋值就不会出现上述情况。出现这个问题的根本原因是多线程操作队列元素(给Node1.next赋值)没有做好并发保护,而先给node.prev 并不是操作队列,将操作队列的步骤延迟到CAS成功之后,就能正确地修改队列。
当然,pred.next = node 执行之前,其它线程可能会遍历查询队列,此时pred.next可能为空,也就是上图的Node1.next可能为空。
这也是网上一些文章说next指针不可靠的原因。
3、问:node 什么时候从队列里移除
cancelAcquire(xx)分三种情形操作同步队列:
1、若node为队尾节点,则将node从队列移除。
2、若node为队头节点,则调用unparkSuccessor(xx)检测。
3、若node为中间节点,则在shouldParkAfterFailedAcquire(xx)/unparkSuccessor(xx) 彻底移除。
4、问:为什么需要从尾部开始索引
在第2点里有分析过节点的next指针可能为空,若是从队头开始索引,有可能还没遍历完整个队列就退出遍历了。因此,为了保险起见,从队尾开始索引。
5、问:为什么需要PROPAGATE状态
PROPAGATE 在共享节点时才用得到,假设现在有4个线程、A、B、C、D,A/B 先尝试获取锁,没有成功则将自己挂起,C/D 释放锁。可以参照Semaphore获取/释放锁流程。
1、C 释放锁后state=1,设置head.waitStatus=0,然后将A唤醒,A醒过来后调用tryAcquireShared(xx),该方法返回r=0,此时state=0。
2、在A还没调用setHeadAndPropagate(xx)之前,D 释放了锁,此时D调用doReleaseShared(),发现head.waitStatus==0,所以没有唤醒其它节点。
3、此时A调用了setHeadAndPropagate(xx),因为r==0且head.waitStatus==0,因此不会调用doReleaseShared(),也就没有唤醒其它节点。最后导致的是B节点没有被唤醒。
若是加了PROPAGATE状态,在上面的第2步骤里的D调用doReleaseShared()后,发现head.waitStatus==0,于是设置head.waitStatus=PROPAGATE,在第3步骤里,发现head.waitStatus==PROPAGATE,于是唤醒B。
虽然在第2步骤里没有唤醒任何线程,但是设置了PROPAGATE状态,在后续的步骤中发现已经设置了PROPAGATE,于是唤醒,这也是PROPAGATE名字的意义:传播。
由于篇幅原因,下篇将分析AQS 中断与否、条件等待等相关知识。
本文基于jdk1.8。
网友评论