AQS独占和共享锁,ReentantLock为独占锁,ReentantReadWriteLock中readLock()为共享锁,writeLock()为独占锁。
读锁与读锁可以共享
读锁与写锁不可以共享
写锁与写锁不可以共享
AQS内部维护了CLH的FIFO的队列,队列中的节点为Node类型的元素
图片转载自https://www.cnblogs.com/waterystone/p/4920797.html
Node节点结构:
static final class Node {
int waitStatus;
Node prev;
Node next;
Node nextWaiter;
Thread thread;
}
waitStatus:等待状态,具体类型如下:
CANCELLED:值为1 取消
SIGNAL:值为-1 唤醒后继节点
CONDITION:值为-2 表示当前节点在condition队列
PROPAGATE:值为-3 共享模式下acquireShared需要向下传播
0:无状态,表示当前节点在队列中等待获取锁
prev:前向节点
next:后继节点
nextWaiter:condition队列中的下一个等待节点
thread:当前线程
AQS中会维护head、tail节点,head节点中实际不存储线程,只保留队列头部节点的引用
state:
临界资源,当未获取到锁时,state=0,获取到锁则会置为state=1
ReentantLock为可重入锁,同一线程多次获取锁时,只会相应增加state的值,释放锁时,会对state递减,当state=0时则释放锁。
CountDownLatch中,初始化new CountDownLatch(arg),arg则为state的值,当其他线程调用countDown()时,则减少state,state=0时,则立即去唤醒后继的节点
ReentantLock中实现了三个内部类:
Sync:继承AQS
FairSync:继承Sync. 实现公平锁
NoFairSync:继承Sync 实现非公平锁,ReentrantLock默认实现
现在已非公平锁为例进行说明:
lock方法:
1.CAS将state状态改为1,成功则将exclusiveOwnerThread设置为当前线程
2.CAS失败,执行acquire(1)
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
//AQS中
1.尝试获取锁,成功则返回,否则执行2
2.执行addWaiter方法,创建独占节点加入队列尾
3.然后自旋的获取锁,获取成功,则返回中断标示
4.获取失败则找到安全点(前续节点为signal -1),则将自己park
5.检查是否被中断,如果是则抛出中断异常
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
ReentrantLock中实现tryAcquire(arg)方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
addWaiter方法
1.将当前线程封装成独占节点
2.尾节点不为null,则CAS方法将node节点添加到队列尾部,否则执行3
3.enq方法,自旋的将node添加到队列尾部,并返回node节点
在添加到对列尾部时,都会先设置node的prev节点,所以判断一个节点是否在队列中,如果node节点的prev==null,则一定不在队列中。如果prev!=null,也不能说节点一定在队列中,有可能正在自旋的设置pred的next节点为node节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
acquireQueued方法
1.获取节点的前续节点p,如果p==head,则去尝试获取锁。成功后将node设置为head节点,返回中断标示,否则执行2
- shouldParkAfterFailedAcquire方法为了找到当前节点的安全点,即前续节点的waitStatus=Node.SIgnal(唤醒后续节点),然后执行3。否则自旋
- 当前线程park,然后返回中断标示
- 如果线程被中断过,则将中断标示设置为true
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
parkAndCheckInterrupt方法判断中断标示,为什么要判断中断标示?
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
非常简单,阻塞当前线程,然后返回线程的中断状态并复位中断状态
在博客【http://www.ideabuffer.cn/2017/03/15/深入理解AbstractQueuedSynchronizer(一)】中有很详细的讲解
···
注意interrupted()方法的作用,该方法是获取线程的中断状态,并复位,也就是说,如果当前线程是中断状态,则第一次调用该方法获取的是true,第二次则是false。而isInterrupted()方法则只是返回线程的中断状态,不执行复位操作。
···
如果acquireQueued执行完毕,返回中断状态,回到acquire方法中,根据返回的中断状态判断是否需要执行Thread.currentThread().interrupt()。
为什么要多做这一步呢?先判断中断状态,然后复位,如果之前线程是中断状态,再进行中断?
这里就要介绍一下park方法了。park方法是Unsafe类中的方法,与之对应的是unpark方法。简单来说,当前线程如果执行了park方法,也就是阻塞了当前线程,反之,unpark就是唤醒一个线程。
具体的说明请参考http://blog.csdn.net/hengyunabc/article/details/28126139
park与wait的作用类似,但是对中断状态的处理并不相同。如果当前线程不是中断的状态,park与wait的效果是一样的;如果一个线程是中断的状态,这时执行wait方法会报java.lang.IllegalMonitorStateException,而执行park时并不会报异常,而是直接返回。
所以,知道了这一点,就可以知道为什么要进行中断状态的复位了:
如果当前线程是非中断状态,则在执行park时被阻塞,这是返回中断状态是false;
如果当前线程是中断状态,则park方法不起作用,会立即返回,然后parkAndCheckInterrupt方法会获取中断的状态,也就是true,并复位;
再次执行循环的时候,由于在前一步已经把该线程的中断状态进行了复位,则再次调用park方法时会阻塞。
所以,这里判断线程中断的状态实际上是为了不让循环一直执行,要让当前线程进入阻塞的状态。想象一下,如果不这样判断,前一个线程在获取锁之后执行了很耗时的操作,那么岂不是要一直执行该死循环?这样就造成了CPU使用率飙升,这是很严重的后果
cancelAcquire方法
在acquireQueued方法的finally语句块中,如果在循环的过程中出现了异常,则执行cancelAcquire方法,用于将该节点标记为取消状态。该方法代码如下:
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
//跳过前续取消节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
//前续节点的后续节点
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
//将当前节点设置为取消
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
//如果当前节点为尾节点,则将尾节点设置为当前节点的pred,并将pred的next节点设置为null,相当于将node节点从队列中移除
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
//
1.如果当前节点的前续节点不为head
2.前续节点ws不为-1则将前续节点的ws设置为-1
3.以上2步都为true时,校验前续节点的线程不为null
3点均满足则pred的next设置为node的后续节点,将node移除
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
//
1.如果node的前续节点为head或者设置状态失败,则唤醒后续节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
这里直接unpark后继节点的线程,然后将next指向了自己。
这里可能会有疑问,既然要删除节点,为什么都没有对prev进行操作,而仅仅是修改了next?【以下摘自参考博客中的内容】
要明确的一点是,这里修改指针的操作都是CAS操作,在AQS中所有以compareAndSet开头的方法都是尝试更新,并不保证成功,图中所示的都是执行成功的情况。
那么在执行cancelAcquire方法时,当前节点的前继节点有可能已经执行完并移除队列了(参见setHead方法),所以在这里只能用CAS来尝试更新,而就算是尝试更新,也只能更新next,不能更新prev,因为prev是不确定的,否则有可能会导致整个队列的不完整,例如把prev指向一个已经移除队列的node。
什么时候修改prev呢?其实prev是由其他线程来修改的。回去看下shouldParkAfterFailedAcquire方法,该方法有这样一段代码:
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
该段代码的作用就是通过prev遍历到第一个不是取消状态的node,并修改prev。
这里为什么可以更新prev?因为shouldParkAfterFailedAcquire方法是在获取锁失败的情况下才能执行,因此进入该方法时,说明已经有线程获得锁了,并且在执行该方法时,当前节点之前的节点不会变化(因为只有当下一个节点获得锁的时候才会设置head),所以这里可以更新prev,而且不必用CAS来更新。
unlock方法
调用了sync的release(1)方法
public void unlock() {
sync.release(1);
}
AQS中的release方法
1.尝试释放锁,释放成功则执行2
2.如果头节点不为空且不为初始化状态,则唤醒后继节点
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantLock重写了tryRelease方法
1.校验当前线程是否为获取锁时设置的exclusiveOwnerThread
2.如果state==0,则将exclusiveOwnerThread设置为null,返回true,释放成功,否则3
3.否则减少重入锁的次数,返回false
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
unparkSuccessor方法
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
主要功能就是要唤醒下一个线程,这里s == null || s.waitStatus > 0判断后继节点是否为空或者是否是取消状态,然后从队列尾部向前遍历找到最前面的一个waitStatus小于0的节点,至于为什么从尾部开始向前遍历,回想一下cancelAcquire方法的处理过程,cancelAcquire只是设置了next的变化,没有设置prev的变化,在最后有这样一行代码:node.next = node,如果这时执行了unparkSuccessor方法,并且向后遍历的话,就成了死循环了,所以这时只有prev是稳定的。
ReentrantLock默认实现了非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
也可以设置为公平锁:
ReentrantLock lock=new ReentrantLock(true);
构造函数:
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
公平锁、非公平锁中实现的tryRelease方法一样,tryAcquire()方法则不同,非公平锁实现如下:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
会在临界资源为0时判断当前是否还有前项节点存在:
1.head!=tail
2.h.next为空
3.h.next !=null时s中线程不是当前线程
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
在AQS中,state可以标示锁的数量,也可以标示其他状态,state由子类负责去定义,AQS只负责维护state,通过state实现对临界资源的访问
【参考博客】
http://www.ideabuffer.cn/2017/03/15/深入理解AbstractQueuedSynchronizer(一)
网友评论