目录
目录1.png目录2.png
AQS
- AQS是ReentrantLock,CyclicBarrier,CountDownLatch,Semaphore,ArrayBlockingQueue的基础,深入理解AQS很有必要
数据结构
-
sync队列(双端队列)
sync队列.png -
condition(单向队列)
condition.png
继承关系
// AOS主要设置/获取独占线程
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
示例
- 例子是直接照搬参考文章leesf写的AQS源码
package com.hust.grid.leesf.reentrantLock;
class Consumer {
private Depot depot;
public Consumer(Depot depot) {
this.depot = depot;
}
public void consume(int no) {
new Thread(new Runnable() {
@Override
public void run() {
depot.consume(no);
}
}, no + " consume thread").start();
}
}
class Producer {
private Depot depot;
public Producer(Depot depot) {
this.depot = depot;
}
public void produce(int no) {
new Thread(new Runnable() {
@Override
public void run() {
depot.produce(no);
}
}, no + " produce thread").start();
}
}
public class ReentrantLockDemo {
public static void main(String[] args) throws InterruptedException {
Depot depot = new Depot(500);
new Producer(depot).produce(500);
new Producer(depot).produce(200);
new Consumer(depot).consume(500);
new Consumer(depot).consume(200);
}
}
根据例子分析源码
-
参考文章leesf写的AQS源码提供的时序图,p1代表produce 500的那个线程,p2代表produce 200的那个线程,c1代表consume 500的那个线程,c2代表consume 200的那个线程
时序图.png
lock调用分析图
-
也是leesf写的AQS源码提供的分析图
lock调用分析图.png
p1调用lock.lock()获取锁
- ReentrantLock::lock --> 默认非公平锁ReentrantLock::NonfairSync::lock,根据时序图这里p1获取到锁走if分支
final void lock() {
// state之前为0能设置成1就获取到锁,并设置独占锁的线程
if (compareAndSetState(0, 1))
// AOS的方法
setExclusiveOwnerThread(Thread.currentThread());
// 获取不到锁,后续分析
else
acquire(1);
}
p2调用lock.lock()未获取锁
- 由p1获取到锁流程分析,p2未获取到锁走else流程
- ReentrantLock::lock --> 默认非公平锁ReentrantLock::NonfairSync::lock -->
AQS::acquire
// AQS acquire
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- AQS::acquire --> ReentrantLock::Sync::tryAcquire, 这里有可重入锁为啥可重入原因,根据时序图分析p2 tryAcquire即不走if也不走if else流程,直接返回fasle
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入锁可重入是因为有设置独占线程的功能,如果是这个线程则state++
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- AQS::acquire --> ReentrantLock::Sync::tryAcquire方法中addWaiter(Node.EXCLUSIVE)这里ReentrantLock是设置独占锁,即同一时间只有一个线程能占有锁,共享锁是同一时间有多个,比如读锁共享,写锁独占
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 刚开始都为null
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 刚开始走这里
enq(node);
return node;
}
private Node enq(final Node node) {
// 刚开始时,第一个for循环创建空的head节点,第二个for头节点后面加入p2
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
图示.png
- AQS::acquire --> ReentrantLock::Sync::tryAcquire方法中acquireQueued,细节可以看中文注解部分
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// p2的前面一个节点是头节点,所以会尝试获取锁
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 刚开始节点状态为0,shouldParkAfterFailedAcquire将其设置为singal节点(-1)
// 然后LockSupport.park将线程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 以上是整个lock调用流程
c1,c2调用lock
-
状态如下(根据时序图此时没有调用unlock),先是c1被加入到sync队列尾部,再是c2加入到sync队列尾部
c1,c2调用lock.png
中间穿插p1调用emptyCondition.signal
- AQS::ConditionObject::signal目前没有emptyCondition await的部分,所以不起啥作用
unlock调用分析
- ReentrantLock::unlock --> AQS::release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- AQS::release先是调用ReentrantLock::tryRelease
protected final boolean tryRelease(int releases) {
// 释放state并释放独占线程
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
- AQS::release再是调用AQS::unparkSuccessor
private void unparkSuccessor(Node node) {
// 此时正常状态是singal,将其设置为初始状态
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 走的这步unpark head后面的线程
if (s != null)
LockSupport.unpark(s.thread);
}
-
此时head后面线程从park中恢复,在cpu时间片获取到的情况下,继续for循环尝试获取锁。即此线程unlcok,会解锁下个线程尝试获取锁
-
unlock后变成
p1 unlock后.png
condition的await分析
-
p2线程执行fullCondition.await
图示.png
AQS:ConditionObject(CO):await
public final void await() throws InterruptedException {
// 对中断做出处理,抛出让调用方接
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
// fullyRelease最终调用tryRelease方法,会释放当前线程持有的锁,并通知后继线程unpark
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 将当前线程park
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
-
addConditionWaiter方法是在Condition队列生成一个,Node.CONDITION属性
Condition队列.png -
fullyRelease最终调用tryRelease方法,会释放当前线程持有的锁,并通知后继线程unpark。还会把自己线程park
最终状态.png
c1线程获取锁
-
c1线程被unpark之后获取cpu时间片,执行前面提到的acquireQueued函数,之后,c1判断自己的前驱结点为head,并且可以获取锁资源, 然后获取锁。 最终是Condition有p2线程节点
c1线程获取锁.png
fullCondtion.signal分析
-
图示分析
signal分析.png - AQS::CO::signal --> AQS::CO::doSignal --> AQS::CO::transferForSignal -->
AQS::CO::compareAndSetWaitStatus
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
-
transferForSignal的enq就是将condition的p2节点入队尾
enq.png -
然后是设置status, compareAndSetWaitStatus
设置status.png
c1.unclock
image.png.pngc2线程执行emptyCondition.await
c2线程执行await.png-
p2线程被unpark,故可以继续运行,经过CPU调度后,p2继续运行,之后p2线程在AQS:await函数中被park,继续AQS.CO:await函数的运行
p2线程被unpark.png
p2继续运行,执行emptyCondition.signal
p2执行signal.pngp2执行unclock
p2执行unclock.png总结
- 每一个结点都是由前一个结点唤醒
- 当结点发现前驱结点是head并且尝试获取成功,则会轮到该线程运行。
- condition queue中的结点向sync queue中转移是通过signal操作完成的。
- 当结点的状态为SIGNAL时,表示后面的结点需要运行
ReentrantLock
- AQS独占锁实现,公平是通过sync队列按照先后顺序实现的,非公平有可能被其他线程CAS设置state而抢占
- 具体实现同AQS举例的
CyclicBarrier
- 通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
- AQS独占锁
CountDownLatch
- 场景: 主线程等待多个线程完成任务。AQS共享模式
- 数据结构类似ReentrantLock,但是没有公平非公平
// 内部类
private static final class Sync extends AbstractQueuedSynchronizer
- 构造函数,初始化state数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
// 初始化状态数 setState
this.sync = new Sync(count);
}
例子
- 参考自lessf的博客
package com.hust.grid.leesf.cyclicbarrier;
import java.util.concurrent.CountDownLatch;
class MyThread extends Thread {
private CountDownLatch countDownLatch;
public MyThread(String name, CountDownLatch countDownLatch) {
super(name);
this.countDownLatch = countDownLatch;
}
public void run() {
System.out.println(Thread.currentThread().getName() + " doing something");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " finish");
countDownLatch.countDown();
}
}
public class CountDownLatchDemo {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
MyThread t1 = new MyThread("t1", countDownLatch);
MyThread t2 = new MyThread("t2", countDownLatch);
t1.start();
t2.start();
System.out.println("Waiting for t1 thread and t2 thread to finish");
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " continue");
}
}
总体流程
总体流程.pngawait分析
-
main线程就被park了,即禁止运行了。此时Sync queue(同步队列)中有两个节点,AQS的state为2,包含main线程的结点的nextWaiter指向SHARED结点
await分析.png
t1线程执行countDownLatch.countDown
-
Sync queue队列里的结点个数未发生变化,但是此时,AQS的state已经变为1了
t1线程执行countDownLatch.countDown.png
t2线程执行countDownLatch.countDown
t2线程执行countDownLatch.countDown.png-
main线程获取cpu资源,继续运行,由于main线程是在parkAndCheckInterrupt函数中被禁止的,所以此时,继续在parkAndCheckInterrupt函数运行
继续运行.png
Semaphore
- 内部类跟ReentrantLock类似有公平非公平锁之分,使用的是AQS共享锁
- 使用场景,数据库连接之类的多个线程共同持有资源
示例
package com.hust.grid.leesf.semaphore;
import java.util.concurrent.Semaphore;
class MyThread extends Thread {
private Semaphore semaphore;
public MyThread(String name, Semaphore semaphore) {
super(name);
this.semaphore = semaphore;
}
public void run() {
int count = 3;
System.out.println(Thread.currentThread().getName() + " trying to acquire");
try {
semaphore.acquire(count);
System.out.println(Thread.currentThread().getName() + " acquire successfully");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release(count);
System.out.println(Thread.currentThread().getName() + " release successfully");
}
}
}
public class SemaphoreDemo {
public final static int SEM_SIZE = 10;
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(SEM_SIZE);
MyThread t1 = new MyThread("t1", semaphore);
MyThread t2 = new MyThread("t2", semaphore);
t1.start();
t2.start();
int permits = 5;
System.out.println(Thread.currentThread().getName() + " trying to acquire");
try {
semaphore.acquire(permits);
System.out.println(Thread.currentThread().getName() + " acquire successfully");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
System.out.println(Thread.currentThread().getName() + " release successfully");
}
}
}
生成10个许可图示
-
main线程执行acquire操作,并且成功获得许可,之后t1线程执行acquire操作,成功获得许可,之后t2执行acquire操作,由于此时许可数量不够,t2线程将会阻塞,直到许可可用。之后t1线程释放许可,main线程释放许可,此时的许可数量可以满足t2线程的要求,所以,此时t2线程会成功获得许可运行,t2运行完成后释放许可
生成10个许可图示.png
semaphore::acquire分析
-
main调用semaphore::acquire
main调用semaphore::acquire.png -
t1调用semaphore::acquire
t1调用semaphore::acquire.png -
t2调用semaphore::acquire,此时资源不足
t2调用semaphore::acquire.png
semaphore::release分析
-
t1 release, t2线程将会被unpark,并且AQS的state为5,t2获取cpu资源后可以继续运行
t1 release .png -
t2获取CPU资源,继续运行, setHeadAndPropagate会调用doReleaseShared,在setHeadAndPropagate的函数中会设置头结点并且会unpark队列中的其他结点。(与独占模式很重要区别)
t2获取CPU资源,继续运行.png
ArrayBlockingQueue
- ArrayBlockingQueue是通过ReentrantLock和Condition条件来保证多线程的正确访问的。可以参考AQS的分析。
ReentrantReadWriteLock
- ReentrantReadWriteLock底层是基于ReentrantLock和AQS来实现的
结构
-
Sync继承自AQS、NonfairSync继承自Sync类、FairSync继承自Sync类;ReadLock实现了Lock接口、WriteLock也实现了Lock接口
结构.png
总体
- 写锁独占锁
- 读锁共享锁
- 读读不冲突,读写冲突
- 有读锁存在时,写获取锁失败进入sync队列原因是,写线程存在读线程也会进入等待。写线程等待获取锁后,后续读线程不会再获取到锁,写线程等待前面读线程释放锁。才获取到锁,写线程后面的读线程才能获取到锁。
- 唤醒其后到下一个尝试获取锁的的节点之间的所有尝试获取读锁的线程
// 写锁获取锁
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取状态
int c = getState();
// 写线程数量
int w = exclusiveCount(c);
if (c != 0) { // 状态不为0
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread()) // 写线程数量为0(说明存在读锁)或者当前线程没有占有独占资源
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 判断是否超过最高写线程数量
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 设置AQS状态
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires)) // 写线程是否应该被阻塞
return false;
// 设置独占线程
setExclusiveOwnerThread(current);
return true;
}
// 读锁获取锁,此函数表示读锁线程获取读锁。首先判断写锁是否为0并且当前线程不占有独占锁,直接返回;否则,判断读线程是否需要被阻塞并且读锁数量是否小于最大值并且比较设置状态成功
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取状态
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) // 写线程数不为0并且占有资源的不是当前线程
return -1;
// 读锁数量
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // 读线程是否应该被阻塞、并且小于最大值、并且比较设置成功
if (r == 0) { // 读锁数量为0
// 设置第一个读线程
firstReader = current;
// 读线程占用的资源数为1
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 当前线程为第一个读线程
// 占用资源数加1
firstReaderHoldCount++;
} else { // 读锁数量不为0并且不为当前线程
// 获取计数器
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) // 计数器为空或者计数器的tid不为当前正在运行的线程的tid
// 获取当前线程对应的计数器
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0) // 计数为0
// 设置
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
- 当读线程执行unlock操作后,AQS的state为0(有多个读时需要等带多个读unlock),写线程将会被unpark,其获得CPU资源就可以运行。
参考文章
- 【JUC】JDK1.8源码分析之AbstractQueuedSynchronizer(二)
- 【JUC】JDK1.8源码分析之ReentrantLock(三)
- 【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)
- 【JUC】JDK1.8源码分析之CountDownLatch(五)
- 【JUC】JDK1.8源码分析之CyclicBarrier(四)
- 【JUC】JDK1.8源码分析之Semaphore(六)
- 【JUC】JDK1.8源码分析之ReentrantReadWriteLock(七)
- CountDownLatch、CyclicBarrier、Semaphore的区别
- CountDownLatch、CyclicBarrier、Semaphore共同之处与区别以及各自使用场景
- 【Java并发编程】详细分析AQS原理之共享锁
- 理解java线程的中断(interrupt)
- 【AQS】独占锁与共享锁
网友评论