一、基本介绍
语法:
// 获取锁
reentrantLock.lock();
try {
// 临界区
} finally {
// 释放锁
reentrantLock.unlock();
}
特点:
- 可重入
- 可中断
- 可以设置超时时间
- 可以设置为公平锁
- 支持多个条件变量(类似Synchronized中的waitset,拿不到锁时进入的waitset中等待,可以支持多个waitset)
1.可重入
同一个线程如果首次获得了这把锁,那么因为它是这把锁的拥有者,因此有权利再次获取这把锁;如果是不可重入锁,那么第二次获得锁时,自己也会被锁挡住。
@Slf4j
public class ReentrantTest {
static ReentrantLock lock = new ReentrantLock();
public static void main(String[] args) {
method1();
}
public static void method1() {
lock.lock();
try {
log.debug("execute method1");
method2();
} finally {
lock.unlock();
}
}
public static void method2() {
lock.lock();
try {
log.debug("execute method2");
method3();
} finally {
lock.unlock();
}
}
public static void method3() {
lock.lock();
try {
log.debug("execute method3");
} finally {
lock.unlock();
}
}
}
20:22:29.392 [main] DEBUG juc.reentrant.ReentrantTest - execute method1
20:22:29.396 [main] DEBUG juc.reentrant.ReentrantTest - execute method2
20:22:29.396 [main] DEBUG juc.reentrant.ReentrantTest - execute method3
实现源码:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
2.可打断
在等待锁的过程中,其他线程可以使用interrupt方法中止当前线程的等待。
可以防止死锁的发生,线程不用一直等下去。
@Slf4j
public class InterruptTest {
private static ReentrantLock reentrantLock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(()->{
try {
//竞争锁失败进入阻塞队列,可以被其他线程调用interrupt方法打断
log.debug("t1线程开始抢锁");
reentrantLock.lockInterruptibly();
} catch (InterruptedException e) {
//抢锁失败进入了阻塞队列然后被打断了就进入这个catch块
e.printStackTrace();
log.debug("t1线程被打断,返回");
return;
}
try{
log.debug("t1线程抢锁成功");
}finally {
reentrantLock.unlock();
}
},"t1");
//主线程先抢锁
log.debug("main线程开始抢锁");
reentrantLock.lock();
log.debug("main线程抢锁成功");
//t1线程启动开始抢锁,抢锁失败
t1.start();
Thread.sleep(1000);
log.debug("main线程打断正在阻塞的t1线程");
t1.interrupt();
}
}
10:09:47.246 [main] DEBUG juc.reentrant.InterruptTest - main线程开始抢锁
10:09:47.248 [main] DEBUG juc.reentrant.InterruptTest - main线程抢锁成功
10:09:47.249 [t1] DEBUG juc.reentrant.InterruptTest - t1线程开始抢锁
10:09:48.250 [main] DEBUG juc.reentrant.InterruptTest - main线程打断正在阻塞的t1线程
10:09:48.251 [t1] DEBUG juc.reentrant.InterruptTest - t1线程被打断,返回
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at juc.reentrant.InterruptTest.lambda$main$0(InterruptTest.java:25)
at java.lang.Thread.run(Thread.java:748)
进入synchronized临界区的代码可以被打断,lockInterruptibly是获取锁失败被阻塞的过程可以被打断。
不可打断模式:
在此模式下,即使它被打断,仍会驻留在 AQS 队列中,一直要等到获得锁后方能得知自己被打断了
private final boolean parkAndCheckInterrupt() {
// 如果打断标记已经是 true, 则 park 会失效
LockSupport.park(this);
// interrupted 会清除打断标记
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 需要获得锁后, 才能返回打断状态
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果是因为 interrupt 被唤醒, 返回打断状态为 true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果打断状态为 true
selfInterrupt();
}
static void selfInterrupt() {
// 重新产生一次中断
Thread.currentThread().interrupt();
}
可打断模式:
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 如果没有获得到锁, 进入 ㈠
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// ㈠ 可打断的获取锁流程
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
// 在 park 过程中如果被 interrupt 会进入此,这时候抛出异常, 不会再次进入 for (;;)
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
3.设置超时时间
@Slf4j
public class TimeoutTest {
public static void main(String[] args) {
test1();
}
private static void test1() {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("t1线程启动...");
try {
//获取不到会等待timeout时间,如果再抢不到就返回
if (!lock.tryLock(1, TimeUnit.SECONDS)) {
log.debug("尝试获取锁,等待 1s 后,失败,返回");
return;
}
} catch (InterruptedException e) {
log.debug("t1线程被打断,获取锁失败",e);
return;
}
try {
log.debug("t1抢锁成功");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("main抢锁成功");
t1.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private static void test2() {
ReentrantLock lock = new ReentrantLock();
Thread t1 = new Thread(() -> {
log.debug("t1线程启动...");
//获取不到不会等待
if (!lock.tryLock()) {
log.debug("t1线程抢锁失败,返回");
return;
}
try {
log.debug("获得了锁");
} finally {
lock.unlock();
}
}, "t1");
lock.lock();
log.debug("main线程抢锁成功");
t1.start();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
使用锁超时解决哲学家就餐问题
public class DealPhilosopherMeal {
public static void main(String[] args) {
Chopstick c1 = new Chopstick("1");
Chopstick c2 = new Chopstick("2");
Chopstick c3 = new Chopstick("3");
Chopstick c4 = new Chopstick("4");
Chopstick c5 = new Chopstick("5");
new Philosopher("苏格拉底", c1, c2).start();
new Philosopher("柏拉图", c2, c3).start();
new Philosopher("亚里士多德", c3, c4).start();
new Philosopher("赫拉克利特", c4, c5).start();
new Philosopher("阿基米德", c5, c1).start();
}
}
/**
* 哲学家类
*/
@Slf4j
class Philosopher extends Thread {
Chopstick left;
Chopstick right;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.left = left;
this.right = right;
}
@Override
public void run() {
while (true) {
// 尝试获得左手筷子
if(left.tryLock()){
try {
// 尝试获得右手筷子
if (right.tryLock()) {
try{
eat();
}finally {
right.unlock();
}
}
}finally {
left.unlock();
}
}
// // 尝试获得左手筷子
// synchronized (left) {
// // 尝试获得右手筷子
// synchronized (right) {
// eat();
// }
// }
}
}
Random random = new Random();
private void eat(){
log.debug("eating...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 筷子类
*/
class Chopstick extends ReentrantLock {
String name;
public Chopstick(String name) {
this.name = name;
}
@Override
public String toString() {
return "筷子{" + name + '}';
}
}
11:21:48.096 [柏拉图] DEBUG juc.reentrant.Philosopher - eating...
11:21:48.097 [赫拉克利特] DEBUG juc.reentrant.Philosopher - eating...
11:21:49.104 [亚里士多德] DEBUG juc.reentrant.Philosopher - eating...
11:21:49.104 [阿基米德] DEBUG juc.reentrant.Philosopher - eating...
11:21:50.106 [柏拉图] DEBUG juc.reentrant.Philosopher - eating...
11:21:50.106 [赫拉克利特] DEBUG juc.reentrant.Philosopher - eating...
11:21:51.107 [苏格拉底] DEBUG juc.reentrant.Philosopher - eating...
11:21:51.107 [赫拉克利特] DEBUG juc.reentrant.Philosopher - eating...
11:21:52.110 [苏格拉底] DEBUG juc.reentrant.Philosopher - eating...
···
4.设置为公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
默认为非公平锁,通过传递一个布尔参数来实现公平。
- 好处:可以解决饥饿问题
- 坏处:降低了并发度
实现原理:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 与非公平锁主要区别在于 tryAcquire 方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查 AQS 队列中是否有前驱节点, 没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
// (一)
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// h != t 时表示队列中有 Node
return h != t &&
// (s = h.next) == null 表示队列中还有没有老二或者队列中老二线程不是此线程
((s = h.next) == null || s.thread != Thread.currentThread());
}
5.支持多个条件变量
synchronized 中的条件变量,就是waitSet ,当条件不满足时进入 waitSet 等待,状态为waiting。
ReentrantLock 的条件变量比 synchronized 强大之处在于,它是支持多个条件变量,synchronized 是那些不满足条件的线程都在一个集合等消息,而 ReentrantLock 支持多个集合,唤醒时也是按集合来唤醒。
每个条件变量对应着一个等待队列,其实现类是 ConditionObject
使用要点:
- await 前需要获得锁
- await 执行后,会释放锁,进入 conditionObject 等待
- await 的线程被唤醒(或打断、或超时)去重新竞争 lock 锁
- 竞争 lock 锁成功后,从 await 后继续执行
基本使用:
public class TestAwait {
static ReentrantLock reentrantLock = new ReentrantLock();
public static void main(String[] args) throws InterruptedException {
Condition condition1 = reentrantLock.newCondition();
Condition condition2 = reentrantLock.newCondition();
reentrantLock.lock();
//进入条件变量中等待
condition1.await();
condition1.signalAll();
}
}
@Slf4j
public class TestAwaitSignal {
static boolean hasEat = false;
static boolean hasDrink = false;
static ReentrantLock room = new ReentrantLock();
//等喝的waitSet
static Condition drinkWaitSet = room.newCondition();
//等吃的waitSet
static Condition eatWaitSet = room.newCondition();
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
room.lock();
try {
log.debug("有吃的没?[{}]", hasEat);
while (!hasEat) {
log.debug("没吃的,先歇会!");
try {
eatWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (hasEat) {
log.debug("有吃的,可以开始干活了");
} else {
log.debug("没干成活...");
}
}finally {
room.unlock();
}
}, "小红").start();
new Thread(() -> {
room.lock();
try {
log.debug("有喝的没?[{}]", hasDrink);
while (!hasDrink) {
log.debug("没喝的,先歇会!");
try {
drinkWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (hasDrink) {
log.debug("有喝的,可以开始干活了");
} else {
log.debug("没干成活...");
}
}finally {
room.unlock();
}
}, "小明").start();
Thread.sleep(1000);
new Thread(() -> {
room.lock();
try {
hasDrink = true;
log.debug("喝的送到");
drinkWaitSet.signal();
} finally {
room.unlock();
}
}, "送喝的").start();
new Thread(() -> {
room.lock();
try {
hasEat = true;
log.debug("吃的送到");
eatWaitSet.signal();
} finally {
room.unlock();
}
}, "送吃的").start();
}
}
14:53:18.641 [小红] DEBUG juc.reentrant.TestAwaitSignal - 有吃的没?[false]
14:53:18.644 [小红] DEBUG juc.reentrant.TestAwaitSignal - 没吃的,先歇会!
14:53:18.644 [小明] DEBUG juc.reentrant.TestAwaitSignal - 有喝的没?[false]
14:53:18.644 [小明] DEBUG juc.reentrant.TestAwaitSignal - 没喝的,先歇会!
14:53:19.645 [送喝的] DEBUG juc.reentrant.TestAwaitSignal - 喝的送到
14:53:19.645 [小明] DEBUG juc.reentrant.TestAwaitSignal - 有喝的,可以开始干活了
14:53:19.646 [送吃的] DEBUG juc.reentrant.TestAwaitSignal - 吃的送到
14:53:19.646 [小红] DEBUG juc.reentrant.TestAwaitSignal - 有吃的,可以开始干活了
await流程
1.开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的addConditionWaiter 流程,创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
![](https://img.haomeiwen.com/i4807654/db50813448be5dec.png)
2.进入 AQS 的 fullyRelease 流程,释放同步器上的锁
![](https://img.haomeiwen.com/i4807654/4cb47d70f2c97c22.png)
3.unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
![](https://img.haomeiwen.com/i4807654/13ffc6686321dd8b.png)
4.park 阻塞 Thread-0
![](https://img.haomeiwen.com/i4807654/d2d50c90c2c6b5e4.png)
signal 流程
1.Thread-1 要来唤醒 Thread-0
![](https://img.haomeiwen.com/i4807654/4daa570dc7764707.png)
2.进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即Thread-0 所在 Node
![](https://img.haomeiwen.com/i4807654/2d5d67808e952348.png)
3.执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的waitStatus 改为 0,Thread-3 的waitStatus 改为 -1
![](https://img.haomeiwen.com/i4807654/1e2ec034f069f3dc.png)
4.Thread-1 释放锁,进入 unlock 流程
二、固定顺序运行
让t2线程先执行:
1.使用wait/notify
@Slf4j
public class OrderRunTest {
static final Object lock = new Object();
// 表示 t2 是否运行过
static boolean t2RunFlag = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock) {
while (!t2RunFlag) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("t1执行");
}
}, "t1");
Thread t2 = new Thread(() -> {
synchronized (lock) {
log.debug("t2执行");
t2RunFlag = true;
lock.notify();
}
}, "t2");
t1.start();
t2.start();
}
}
15:26:05.771 [t2] DEBUG juc.reentrant.OrderRunTest - t2执行
15:26:05.775 [t1] DEBUG juc.reentrant.OrderRunTest - t1执行
2.使用await/signal
@Slf4j
public class OrderRunTest2 {
static ReentrantLock reentrantLock = new ReentrantLock();
static Condition waitSet = reentrantLock.newCondition();
// 表示 t2 是否运行过
static boolean t2RunFlag = false;
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
reentrantLock.lock();
try {
while (!t2RunFlag) {
try {
waitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
log.debug("t1执行");
}finally {
reentrantLock.unlock();
}
}, "t1");
Thread t2 = new Thread(() -> {
reentrantLock.lock();
try {
log.debug("t2执行");
t2RunFlag = true;
waitSet.signal();
}finally {
reentrantLock.unlock();
}
}, "t2");
t1.start();
t2.start();
}
}
3.使用park/unpark
public class Test {
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
LockSupport.park();
log.debug("1");
}, "t1");
t1.start();
new Thread(() -> {
log.debug("2");
LockSupport.unpark(t1);
},"t2").start();
}
}
三、交替运行
1.使用wait/notify
@Slf4j
public class RotateTest {
public static void main(String[] args) {
WaitNotify wn = new WaitNotify(1, 5);
new Thread(() -> wn.print("a", 1, 2)).start();
new Thread(() -> wn.print("b", 2, 3)).start();
new Thread(() -> wn.print("c", 3, 1)).start();
}
}
/*
输出内容 等待标记 下一个标记
a 1 2
b 2 3
c 3 1
*/
@Slf4j
class WaitNotify {
// 打印 a 1 2
public void print(String str, int waitFlag, int nextFlag) {
for (int i = 0; i < loopNumber; i++) {
synchronized (this) {
while(flag != waitFlag) {
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.print(str);
flag = nextFlag;
this.notifyAll();
}
}
}
// 等待标记
private int flag; // 2
// 循环次数
private int loopNumber;
public WaitNotify(int flag, int loopNumber) {
this.flag = flag;
this.loopNumber = loopNumber;
}
}
abcabcabcabcabc
2.使用await/signal
public class RotateTest1 {
public static void main(String[] args) throws InterruptedException {
AwaitSignal awaitSignal = new AwaitSignal(5);
Condition a = awaitSignal.newCondition();
Condition b = awaitSignal.newCondition();
Condition c = awaitSignal.newCondition();
new Thread(() -> awaitSignal.print("a", a, b), "t1").start();
new Thread(() -> awaitSignal.print("b", b, c), "t2").start();
new Thread(() -> awaitSignal.print("c", c, a), "t3").start();
Thread.sleep(1000);
awaitSignal.lock();
try {
a.signal();
} finally {
awaitSignal.unlock();
}
}
}
class AwaitSignal extends ReentrantLock{
// 循环次数
private int loopNumber;
public AwaitSignal(int loopNumber){
this.loopNumber = loopNumber;
}
/**
* 打印
* @param str 打印的字符
* @param current 当前条件变量
* @param next 下一个条件变量
*/
public void print(String str,Condition current,Condition next){
for (int i = 0; i < loopNumber; i++) {
lock();
try{
try {
current.await();
System.out.print(str);
next.signal();
} catch (InterruptedException e) {
e.printStackTrace();
}
}finally {
unlock();
}
}
}
}
3.使用park/unpark
@Slf4j
public class RotateTest2 {
static Thread t1;
static Thread t2;
static Thread t3;
public static void main(String[] args) {
ParkUnpark pu = new ParkUnpark(5);
t1 = new Thread(() -> pu.print("a", t2));
t2 = new Thread(() -> pu.print("b", t3));
t3 = new Thread(() -> pu.print("c", t1));
t1.start();
t2.start();
t3.start();
LockSupport.unpark(t1);
}
}
class ParkUnpark {
public void print(String str, Thread next) {
for (int i = 0; i < loopNumber; i++) {
LockSupport.park();
System.out.print(str);
LockSupport.unpark(next);
}
}
private int loopNumber;
public ParkUnpark(int loopNumber) {
this.loopNumber = loopNumber;
}
}
四、非公平锁实现原理
构造器默认为非公平锁实现:
public ReentrantLock() {
sync = new NonfairSync();
}
NonfairSync 继承自 AQS
没有竞争时:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
![](https://img.haomeiwen.com/i4807654/5d0fbe9460714d6d.png)
3.Thread-1 执行了:
- 1.CAS 尝试将 state 由 0 改为 1,结果失败
- 2.进入 tryAcquire 逻辑,这时 state 已经是1,结果仍然失败
- 3.进入 addWaiter 逻辑,构造 Node 队列
黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
Node 的创建是懒惰的
第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(AbstractQueuedSynchronizer.Node.EXCLUSIVE), arg))
selfInterrupt();
}
![](https://img.haomeiwen.com/i4807654/4f9a52268c6a69ee.png)
4.当前线程进入 acquireQueued 逻辑:
- 1.acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
- 2.如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,这时 state 仍为 1,失败
- 3.进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的waitStatus 改为 -1(需要唤醒后继节点),返回 false
- 4.shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时state 仍为 1,失败
- 5.当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回true
- 6.进入 parkAndCheckInterrupt, Thread-1 park(灰色表示)
final boolean acquireQueued(final AbstractQueuedSynchronizer.Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final AbstractQueuedSynchronizer.Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
![](https://img.haomeiwen.com/i4807654/5dea987158bbfc7a.png)
5.再次有多个线程经历上述过程竞争失败:
![](https://img.haomeiwen.com/i4807654/f0e743188f5ae71c.png)
Thread-0 释放锁,进入 tryRelease 流程:
- 设置 exclusiveOwnerThread 为 null
-
state = 0
image.png
- 当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
- 找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程:
-
如果加锁成功(没有竞争),会设置
1.exclusiveOwnerThread 为 Thread-1,state = 1
2.head 指向刚刚 Thread-1 所在的 Node,该 Node 清空 Thread
3.原本的 head 因为从链表断开,而可被垃圾回收
image.png
如果这时候有其它线程来竞争(非公平的体现),例如这时有 Thread-4 来了:
![](https://img.haomeiwen.com/i4807654/f00363ab9fd72694.png)
如果被 Thread-4 先占了:
- Thread-4 被设置为 exclusiveOwnerThread,state = 1
- Thread-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
是否需要 unpark 是由当前节点的前驱节点的 waitStatus == Node.SIGNAL 来决定,而不是本节点的waitStatus 决定
加锁源码:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//首先用 cas 尝试(仅尝试一次)将 state 从 0 改为 1, 如果成功表示获得了独占锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果尝试失败,进入 ㈠
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// ㈠ AQS 继承过来的方法
public final void acquire(int arg) {
// ㈡ tryAcquire
if (!tryAcquire(arg) &&
// 当 tryAcquire 返回为 false 时, 先调用 addWaiter ㈣, 接着 acquireQueued ㈤
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// ㈡ 进入 ㈢
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
// ㈢ Sync 继承过来的方法
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 如果还没有获得锁
if (c == 0) {
// 尝试用 cas 获得, 这里体现了非公平性: 不去检查 AQS 队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果已经获得了锁, 线程还是当前线程, 表示发生了锁重入
else if (current == getExclusiveOwnerThread()) {
// state++
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 获取失败, 回到调用处
return false;
}
// ㈣ AQS 继承过来的方法
private Node addWaiter(Node mode) {
// 将当前线程关联到一个 Node 对象上, 模式为独占模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 如果 tail 不为 null, cas 尝试将 Node 对象加入 AQS 队列尾部
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 双向链表
pred.next = node;
return node;
}
}
// 尝试将 Node 加入 AQS, 进入 ㈥
enq(node);
return node;
}
// ㈥ AQS 继承过来的方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 还没有, 设置 head 为哨兵节点(不对应线程,状态为 0)
if (compareAndSetHead(new Node()))
tail = head;
} else {
// cas 尝试将 Node 对象加入 AQS 队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// ㈤ AQS 继承过来的方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 上一个节点是 head, 表示轮到自己(当前线程对应的 node)了, 尝试获取
if (p == head && tryAcquire(arg)) {
// 获取成功, 设置自己(当前线程对应的 node)为 head
setHead(node);
// 上一个节点 help GC
p.next = null; // help GC
failed = false;
// 返回中断标记 false
return interrupted;
}
// 判断是否应当 park, 进入 ㈦
if (shouldParkAfterFailedAcquire(p, node) &&
// park 等待, 此时 Node 的状态被置为 Node.SIGNAL ㈧
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// ㈦ AQS 继承过来的方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取上一个节点的状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 上一个节点在阻塞, 那么自己也阻塞
return true;
// > 0 表示取消状态
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 这次还没有阻塞
// 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// ㈧ 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
解锁源码:
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 尝试释放锁, 进入 ㈠
if (tryRelease(arg)) {
// 队列头节点 unpark
Node h = head;
// 队列不为 null ;waitStatus == Node.SIGNAL 才需要 unpark
if (h != null && h.waitStatus != 0)
// unpark AQS 中等待的线程, 进入 ㈡
unparkSuccessor(h);
return true;
}
return false;
}
// ㈠
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// (二)
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 如果状态为 Node.SIGNAL 尝试重置状态为 0 不成功也可以
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
Node s = node.next;
// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
网友评论