简介
AbstractQueuedSynchronizer(简称AQS)是JAVA中用来实现锁的基础框架,其内部维护了一个等待队列,用来存储等待获取锁的线程。队列遵循FIFO(先进先出)的原则,每次入队列都是添加到队列的尾部,tail总是指向尾部节点;出队列就是将head节点设置指向head的next节点,head节点中没有线程,其代表当前获取到锁的线程。其可实现共享锁和独占锁两种方式的锁。
+------+ prev +-----+ +-----+
| head | <---- | | <---- | tail|
+------+ +-----+ +-----+
队列节点类
static final class Node {
/** 共享模式标识 */
static final Node SHARED = new Node();
/** 独占模式标识 */
static final Node EXCLUSIVE = null;
/** 节点状态值,表示线程被取消 */
static final int CANCELLED = 1;
/** 节点状态值,表示后一个节点需要被unparking唤醒 */
static final int SIGNAL = -1;
/** 节点状态值,表示在等待某一个条件,此节点在condition队列中,而不是sync队列中 */
static final int CONDITION = -2;
/** 节点状态值,唤醒可以向后传递,共享模式时使用 */
static final int PROPAGATE = -3;
/**
* 节点状态,sync时默认为0;condition时默认初始化为Node.CONDITION
*/
volatile int waitStatus;
/**
* 前一个节点
*/
volatile Node prev;
/**
* 后一个节点
*/
volatile Node next;
/**
* 节点代表的线程
*/
volatile Thread thread;
/**
* sync时为标识位
*/
Node nextWaiter;
/**
* 节点是否为共享模式
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回当前节点的前一个节点,前一个节点为空则抛异常
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) { // addWaiter时使用
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Condition时使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS中的属性
/**
* 等待队列的头结点,惰性初始化。除初始化时,只能通过setHead方法修改。
* 注意:如果头结点存在,它的状态确保不能是CANCELLED状态。
*/
private transient volatile Node head;
/**
* 等待队列的尾结点,惰性初始化。只能通过enq方法添加新的等待节点
*/
private transient volatile Node tail;
/**
* 同步状态标识
*/
private volatile int state;
AQS中的方法
- enq,即入队列(enqueue)方法,将节点插入到队列的尾部
/**
* 插入节点到队列尾部,必要时需要进行初始化
* @param node 需要被插入的节点
* @return 此节点的前一个节点
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 尾节点为空,表示队列未被初始化,设置头节点为一个空的Node,并将tail指向头结点。设置完后进行下一次循环,进入else中
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t; //设置node的前节点为队列的尾节点
if (compareAndSetTail(t, node)) {
t.next = node; //原子更新尾节点成功后,node成为新的尾节点tail,原来的尾节点t成为node的前节点,设置t的后节点为node
return t;
}
}
}
}
- addWaiter,为当前线程创建节点并加入到等待队列中
/**
* 为当前线程创建节点并加入到队列中
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { // 与enq方法中的else中功能相同
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); //尾节点tail为null或者更新node为尾节点失败,执行此处
return node;
}
- setHead,设置队列的头结点,也是出队列。
/**
* 设置队列的头结点,也是出队列。被acquire方法调用
*
* @param node the node
*/
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
- unparkSuccessor,唤醒挂起的节点
private void unparkSuccessor(Node node) {
/*
* 当前结点状态小于0时,设置状态为0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 唤醒的线程被successor持有,通常情况下就是下一个节点。但是如果被取消或者为null了,
* 需要从队列尾tail向前查找离node最近的未被取消的successor
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread); //唤醒线程。【对同一线程先unpark再park,park是不会阻塞线程的。】
}
- cancelAcquire,取消正在尝试获取锁的node
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// 跳过已取消的前节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// 如果node为尾节点,则将有效的前节点设置为尾节点
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null); //尾节点的next设置为null
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) {
// 1. pred不是头节点
// 2. pred的状态为Node.SIGNAL或者状态<=0同时可以设置为Node.SIGNAL状态
// 3. pred的持有的线程不为空
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next); //pred的next节点设置为当前节点的下一个节点
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
- shouldParkAfterFailedAcquire,检查与更新获取锁失败的节点的状态,如果线程需要挂起,返回true.
/**
*
* 检查与更新获取锁失败的节点的状态,如果线程需要挂起,返回true.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 这个节点已经设置为允许释放锁release来唤醒它的状态,所以它可以被挂起
*/
return true;
if (ws > 0) {
/*
* 跳过所有已经被取消的前节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* 此处的waitStatus状态为0或者PROPAGATE。
* 表示此节点需要一个唤醒信号,但是还没有被挂起。
* 调用者需要重试来确保它在挂起前不能获取到锁
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- selfInterrupt,中断当前线程
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
- parkAndCheckInterrupt,挂起当前线程并检查是否中断
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
- acquireQueued,对于已经在队列中的节点,以独占的方式获取锁
/**
* 对于已经在队列中的节点,以独占的方式获取锁。condition的wait和acquire方法会使用它
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true; //是否失败
try {
boolean interrupted = false; //是否中断
for (;;) {
/**
* 队列中的所有节点都在此处循环调用,
* 只有pred为头节点的节点可能获取到锁,
* 这时重新设置获取到锁的节点为头结点(获取到锁的节点出队列)并返回
*/
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { //前节点为头节点同时获取锁成功
setHead(node); //将node节点设置为头结点
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //检查是否需要被挂起
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- acquire,获取独占锁
/**
* 调用tryAcquire获取锁,如果成功则结束
* 如果失败,则用addWaiter为当前线程创建独占锁节点加入到等待队列中
* 调用acquireQueued方法不断循环检查是否能获取到锁或者挂起线程,直到获取到锁后返回
* 可用来实现Lock#lock方法
* @param arg
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- release,释放锁
**
* 释放锁。
* 可用来实现{@link Lock#unlock}
*
* @param arg arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
/*
* 释放锁成功,头结点不为空且状态不为0,则唤醒头结点后面节点的线程;
* 唤醒后,被唤醒的线程则在acquireQueued方法的for循环里循环获取锁
*/
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- tryAcquire, 需子类实现,以独占的方式获取锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
- tryRelease,需子类实现,释放独占所
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
网友评论