AQS同步状态的获取和释放
个人感觉
其实了解AQS应该从同步状态的获取和释放开始,但是看了源码后,绝对会有一种不知道从哪里开始的感觉,后来还是觉得从addWaiter将节点加入CLH队列开始,一点一点的去了解
- 独占式和共享式获取同步状态
独占式
所谓独占式同步状态的获取和释放,属于我们平常所说的悲观锁
这样的话,独占式获取同步状态,每次只有一个线程获取同步状态
由于悲观锁的加锁策略,如果有两个读线程同时获取共享状态的话,只会有一次线程获取同步状态,另外一个必须阻塞等待,势必会对性能造成影响
共享式
相对于独占式,允许多个读线程同时获取锁,并发的访问共享资源,但是不允许在写线程进行时,有读线程获取锁
属于乐观锁,相比较悲观锁的严格的加锁策略,乐观锁的加锁策略相对宽松。
- 获取同步状态
AQS的入口肯定是acquire获取同步状态
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
/
public final void acquire(int arg) {
//1、tryAcquire 尝试获取同步状态,成功则返回,在获取同步状态时,需保证线程安全的获取
if (!tryAcquire(arg) &&
//2、如果获取失败,则调用addWaiter方法将线程加入CLH队列(其实是包含当前线程的节点)
//3、调用acquireQueued将包含当前线程的节点,自旋的方式去获取同步状态
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//4、Thread.currentThread().interrupt();中断当前线程
selfInterrupt();
}
解释方法的注释开始吧(纯个人)
在独占模式下获取同步状态,不响应中断(即:在获取同步状态的时候,如果线程被中断或者取消,线程并没有从CLH队列中移除)
该方法最少会执行一次tryAcquire方法,成功则返回
tryAcquire失败,包含该线程,即status会被构造成一个新的节点,加入到CLH队列中
CLH队列中的每个节点会自旋(死循环),获取同步状态
- acquireQueued
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
//操作标识,会再finally中使用到
boolean failed = true;
try {
//中断标识
boolean interrupted = false;
for (;;) {
//当前节点的前驱节点
final Node p = node.predecessor();
//如果当前节点的前驱节点是头结点,则tryAcquire尝试获取同步状态
if (p == head && tryAcquire(arg)) {
//设置当前节点为头结点
setHead(node);
//取消前驱节点对当前节点next的引用,有利于GC回收头结点
p.next = null; // help GC
//标识为False,在finally的时候不会取消同步状态的获取
failed = false;
//返回false,即中断状态
return interrupted;
}
//同步状态获取失败的话,则线程等待,这个方法应该是重点
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
解释
已经在CLH队列中的、在独占模式下、非中断的线程,获取同步状态,只有该节点的前驱节点是头结点时,
就CLH队列的FIFO的特性,不难理解为什么前驱节点是头结点时,才开始尝试获取同步状态
- 响应中断式获取同步状态
- acquire采用了独占式获取同步状态,不响应中断,在获取同步状态时,如果线程被中断或者被取消,节点仍然在CLH队列中,依然会尝试获取同步状态
/**
* 1、 独占模式下获取同步状态
* 2、 响应中断,如果被中断,则中止线程
* 3、 最少会执行一次tryAcquire,成功则返回
*
*
public final void acquireInterruptibly(int arg)
throws InterruptedException {
//如果线程被中断,则抛出InterruptedException异常
if (Thread.interrupted())
throw new InterruptedException();
//调用tryAcquire尝试获取同步状态
if (!tryAcquire(arg))
//获取失败,则调用doAcquireInterruptibly
doAcquireInterruptibly(arg);
}
和acquire不同的是,
1、在获取同步状态时,会判断当前线程是否被中断,如果被中断,则配出InterruptedException,
2、tryAcquire失败后,直接调用doAcquireInterruptibly加入CLH队列并自旋获取同步状态,取消了中断标识,改成了直接抛出异常
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//将线程加入CLH队列中(独占模式)
final Node node = addWaiter(Node.EXCLUSIVE);
//操作标识
boolean failed = true;
try {
//自旋,死循环
for (;;) {
//当前节点的前驱节点p
final Node p = node.predecessor();
//如果当前节点的前驱节点是头结点,则tryAcquire尝试获取同步状态
if (p == head && tryAcquire(arg)) {
//设置当前节点为头结点
setHead(node);
//前驱节点释放对当前节点的引用,有利于GC 进行回收
p.next = null; // help GC
//操作标识为false
failed = false;
return;
}
//如果获取失败,则shouldParkAfterFailedAcquire,还是细说吧
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//这里其实就是和acquireQueued中的区别所在,acquireQueued采用了中断标识interrupted,而相应中断直接抛出异常
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 独占式超时获取
/**
* Attempts to acquire in exclusive mode, aborting if interrupted,
* and failing if the given timeout elapses. Implemented by first
* checking interrupt status, then invoking at least once {@link
* #tryAcquire}, returning on success. Otherwise, the thread is
* queued, possibly repeatedly blocking and unblocking, invoking
* {@link #tryAcquire} until success or the thread is interrupted
* or the timeout elapses. This method can be used to implement
* method {@link Lock#tryLock(long, TimeUnit)}.
* 1、独占模式下获同步状态
* 2、如果被中断,则线程中止
* 3、线程首先判断是否被中断
* 4、至少会调用一次tryAcquire尝试获取同步状态
* 5、相比较acquireInterruptibly,新增了对时间的控制
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
* Acquires in exclusive timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//超时控制
if (nanosTimeout <= 0L)
return false;
//唤醒时间
final long deadline = System.nanoTime() + nanosTimeout;
//将当前线程构建成新的节点,加入到CLH队列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//当前节点的前驱节点
final Node p = node.predecessor();
//如果当前节点的前驱节点是头结点,则尝试获取同步状态
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
//剩余时间
nanosTimeout = deadline - System.nanoTime();
//已经超时,返回false
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)//如果剩余时间大于快速自旋剩余时间,则休眠,如果小于快速自旋剩余时间则直接进行自旋
LockSupport.parkNanos(this, nanosTimeout);
//线程如果被中断,则抛出InterruputException异常
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 独占式同步状态释放
public final boolean release(int arg) {
//尝试释放同步状态
if (tryRelease(arg)) {
//释放成功,调用unparkSuccessor对后继节点进行唤醒动作
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 共享式获取同步状态
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
* 1、共享模式下获取同步状态,不响应中断
* 2、最少会调用一次tryAcquireShared,成功则返回
* 3、否则线程入队,执行tryAcquireShared直到成功为止
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
//加入到CLH队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//中断标识
boolean interrupted = false;
for (;;) {
//当前节点的前驱节点
final Node p = node.predecessor();
//前驱节点如果是头结点
if (p == head) {
//尝试获取同步状态
int r = tryAcquireShared(arg);
//如果r>0,则说明获取成功
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 写在后面
- 胆小善良的我还总想干点大事
- 不要卑微了自己
网友评论