注:关键的代码的部分会有注释。
AQS介绍,主要是看属性,方法遇到了再看
内部类 node,用于维护获取锁的线程的双向队列,详细的可以具体看注释,这里就不一一介绍了
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
.....
//获取锁队列队头
private transient volatile Node head;
//获取锁队列队尾
private transient volatile Node tail;
//加锁标志位,主要是通过cas操作,算是委托给他判断是否加锁成功
private volatile int state;
//在aqs父类中,当前持有锁的线程
private transient Thread exclusiveOwnerThread;
......
ReentrantLock的具体属性
//Sync 继承自AQS
abstract static class Sync extends AbstractQueuedSynchronizer{...}
//
private final Sync sync;
//公平实现
static final class FairSync extends Sync {...}
//非公平实现
static final class NonfairSync extends Sync {...}
加锁过程
lock方法解析
public void lock() {
sync.lock();
}
sync.lock() 方法有两个实现,一个是公平锁的实现,另一个是非公平锁的实现,如下图
1.png
我们先看非公平锁的实现的加锁方式(NonfairSync),代码如下
final void lock() {
//如果cas修改值state值成功,则加锁成功
if (compareAndSetState(0, 1))
//设置当前持有锁的线程为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
//不成功就在重新获取锁,过程较为复杂
acquire(1);
}
acquire(1)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
此方法可分为三个部分,!tryAcquire(arg)、addWaiter(Node.EXCLUSIVE)、 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),一一分析具体干了什么事。
!tryAcquire(arg),可以跟踪代码先调用NonfairSync.tryAcquire()方法,调用Sync.nonfairTryAcquire()
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取state的值
int c = getState();
//等于0那么现在无线程持有锁
if (c == 0) {
//cas获取锁
if (compareAndSetState(0, acquires)) {
//设置当前线程持有锁
setExclusiveOwnerThread(current);
return true;
}
}
//如果持有锁的线程和当前线程为同一线程(重入),state累加
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
若以上方法返回true,那么加锁成功,则!tryAcquire(arg)为false,不会执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)这部分代码。若以上代码返回false,加锁失败,则!tryAcquire(arg)为true,则会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),现在先假设加锁失败。
addWaiter(Node.EXCLUSIVE),索取锁线程入队
private Node addWaiter(Node mode) {
//新建node
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//取队尾
Node pred = tail;
//如果队尾不为空
if (pred != null) {
//取当线程的的节点的前节点为对位
node.prev = pred;
//cas设置队尾的后节点为当前线程的节点
if (compareAndSetTail(pred, node)) {
//倒数第二节点的下个节点指向当前线程的节点
pred.next = node;
return node;
}
}
//尾节点不为空,入队
enq(node);
return node;
}
//入队相关代码
private Node enq(final Node node) {
for (;;) {
//取队尾
Node t = tail;
//初始化队列
if (t == null) { // Must initialize
//设置头节点
if (compareAndSetHead(new Node()))
//队首队尾都指向 上一行的new Node()
tail = head;
} else {
//当前线程节点的前节点为队尾节点
node.prev = t;
//cas操作设置队尾为当前线程节点
if (compareAndSetTail(t, node)) {
队尾(现在倒数第二个节点)后节点为当前线程节点
t.next = node;
return t;
}
}
}
}
到这里入队的相关操作就完成分析了,开始下一个方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的分析。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg),从队列面获取线程来持有锁。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取当前线程节点的前节点
final Node p = node.predecessor();
//如果前节点为队首(这里明白这个双向队列的第一个节点为空),且获取锁成功(NonfairSync.tryAcquire())
if (p == head && tryAcquire(arg)) {
setHead(node);
//如果不设置为null,可能会引起oom,引用一直存在
p.next = null; // help GC
failed = false;
return interrupted;
}
//判断是否需要挂机线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//设置当前节点为头节点,清空了线程和前节点
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
//判断是否需要挂起线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//node 类中有对应描述,代表现需要被unpraking
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
//节点被取消,跳过 、重试
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//设置parking标志
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//park线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
以上就是ReentrantLock 的加锁过程。后面解析ReentrantLock 的解锁过程
unlock
public void unlock() {
//调用sync的release
sync.release(1);
}
public final boolean release(int arg) {
//如果解锁成功
if (tryRelease(arg)) {
//取头节点
Node h = head;
//如果头节点不为空且waitStatus 不为CANCELLED、SIGNAL 、CONDITION 、PROPAGATE
if (h != null && h.waitStatus != 0)
//解锁线程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//获取状态并减1
int c = getState() - releases;
//判断持有锁的线程是不是当前线程
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//解锁成功
if (c == 0) {
free = true;
//清空线程
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//unpark线程节点
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
//ws<0 即ws为CANCELLED 、SIGNAL 、CONDITION 、PROPAGATE 中一种
if (ws < 0)
//ws设置为0,让下一个线程节点尝试获取锁
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
公平锁与非公平锁区别在于tryAcquire的时候,会检查是不是已经有线程在排队了,如果有那么直接入队尾,不在尝试获取锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//判断是队列是否已有线程
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//h != t 初始化的时候,(s = h.next) == null头节点的下个节点为空(第一个节点为”空“),s.thread != Thread.currentThread()头节点的下个节点与当前线程不为同一线程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
综上就是ReentrantLock的加锁和解锁过程,现在对自旋有一点感觉了,enq(...)和 acquireQueued(...)中循环都是自选的体现。
网友评论