一、公平锁和非公平锁
非公平锁的源码:
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
可以看到非公平锁的lock,上来就先尝试获取锁。compareAndSetState(0,1)。失败的话,走acquire方法。
公平锁的源码:
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
可以看到,lock方法按部就班的执行acquire方法。我们再看下这个acquire方法的源码:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
咱们就先看下tryAcquire方法,该方法在AbstractQueuedSynchronizer类中是一个模板方法,抽象的。需要子类实现。很明显,该方法在非公平和公平锁中取实现。先看下,NonfairSync对tryAcquire的实现:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可以看到,其实现中,如果发现c=0没有线程获取锁的情况下,上来就compare下去拿锁,不管先来后到。如果锁已经被占用了,看下是否是自己线程,这个是可重入的实现。否则try拿不到锁,返回false。下面再看下FairSync锁的try实现:
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可以看到和NonfairSync的唯一不同,是c==0的时候,compare前先执行hasQueuedPredecessors()方法。进去该方法看下,发现其实现的目的是看下当前锁CLH队列中,head的下一个node是不是自己。如果是自己,那么采取compare获取锁。如果不是自己,那么就不去compare,把机会留给别的线程。🤣多么的公平公正呀,哈哈哈。源码如下:
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
这里先总结下:对于公平锁和非公平锁,在获取锁的第一个阶段try阶段,非公平锁是上来先尝试拿锁,公平锁是先看看下一个拿锁是不是自己。
1.1 拿锁的第二个阶段,添加waiter监视器,其实就是向CLH队列添加一个Node,请看源码。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法已经介绍完了,下面介绍下acquireQueued方法。咱们先看下addWaiter方法干了啥?
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
这里面实例化了一个Node,把当前线程传进来,表示当前线程需要添加到CLH队列中,等待锁,mode是排他模式。在执行插入方法enq方法前,先执行下compare插入。如果成功了,就返回。失败了,在执行enq方法。这样做的目的是使用cas乐观锁,默认系统锁竞争不大,能提高效率。咱们在看下enq方法,是如果能保证,线程安全且一定能插入到CLH队列中。源码如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
这里恍然大悟了,还是用了cas乐观锁。但是增加了for无限循环去执行😂。这里对CLH进行了懒加载,如果tail为空的时候,才会cas线程安全进行init CLH队列。如果已经初始化过了,那么执行双链表的插入Node操作。把Node插入到tail队尾。
这里CLH队列插完后,代表线程已经在等待着锁了。那么lock方法可不是插入完Node就结束了,它要自己阻塞等待锁了。就是阻塞等待自己的Node变成CLH队列的Head节点。
1.2、下面看下lock的第三个阶段,阻塞等待阶段。
阻塞等待阶段就是上面看到一半的方法acquireQueued。它里面的参数addWaiter,1.1已经介绍了。就看下acquireQueued内部的源码了:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
看代码可知,其思路就是无限循环自己的Node节点,有没有成为Head的next。如果成了,就去try拿锁。如果还没成Head的next,那么就执行shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt()。从这个方法就可以知道,只有Head节点释放锁之后,其next节点才有机会获取锁。这里先放一个疑问:如果Head拿到锁了,还没释放,那么Next节点线程执行到这里,发现自己成了next,那么会执行tryAcquire,这个时候会拿到锁吗?为什么?我们下一节讲解。
1.3、在看下shouldParkAfterFailedAcquire和parkAndCheckInterrupt方法。
其实这两个方法比较简单了,shouldParkAfterFailedAcquire如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
它决定当前node能不能park阻塞。决定的依据是是该Node的前驱节点是不是SIGNAL状态。如果是,那么直接阻塞。如果不是,是>0的话,表示该Node的前驱节点已经取消获取锁了,可以安全的删除它了。就有了上面do{}while()方法,一直向前找到状态小于0的Node。如果<0且不是SIGNAL,那么就需要把它变成SIGNAL状态。然后第二次进入该方法的时候,就必然是true了。parkAndCheckInterrupt方法源码如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
注意线程就是阻塞在park这里了。该线程的字节码会执行在这里,停止住了。让出cpu给其他线程,该线程休眠阻塞,等待其他线程唤醒。其实就是其他线程执行unlock唤醒。后面再说。如果park后被唤醒了,说明肯定有其他线程执行了unlock操作,那么该Node的线程,再次检查自己是不是Head的next节点,在执行一遍for循环里面的操作。
到此,一次lock操作,到park方法这里算是暂时结束了。这里,如果锁竞争大的话,肯定会被多次唤醒,然后发现不是自己该获取的,然后再次park。就这样经历:park -> unpark -> park.....,一直这样下去。直到自己成为Head的next节点。
二、这节介绍lock的兄弟unlock方法
unlock方法的源码很简单,如下:
public void unlock() {
sync.release(1);
}
release方法的源码,如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
可以看到核心方法在tryRelease上,该方法可以预见,应该是修改CLH队列的status,让status回到它正常的值。其源码如下:
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
看到c=status - releases,正是让status回归正常。如果它持有锁,并且会根据重入的次数,一次递减。恰好是最后一个递减(c=0),那么它就释放锁,返回值为true。回到上面的release方法,如果返回值为true,那么就会执行unparkSuccessor方法,用来通知CLH的Head节点后面的线程。源码如下:
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
该方法的参数Node其实就是CLH队列的Head节点。其实为啥要先判断ws<0? 先看下面的node.next节点,如果该节点已经取消获取锁的话,那么会从tail往前遍历,找到第一个waitStatus<=0的节点,去唤醒它。
我理解这么做的目的应该是:
1、如果遇到Head.next节点,取消等待锁的话,那么很有可能是前面的节点占用锁太长,导致后面的线程节点超时,取消等待了。那么从Head往后遍历,大概率也是超时,可能要遍历很久才能找到一个合适的节点。从后往前遍历,毕竟越后,说明插入时间最近,最有可能是合适的节点,提高了效率。
1.3、unlock后,会唤醒指定的succssor节点。那么我们继续1.1介绍的,线程Node park住的地方。
lock的源码如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
park这里终于可以返回了,然后检查下它在阻塞期间,是否被其他线程中断过,并返回中断的标志。再看下面的源码:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
从parkAndCheckInterrupt返回后,又可以执行到for循环的开始位置了,会重新tryAcquire。这个时候,基本上该节点就是Head的next节点了。去获取锁的时候,CLH队列的status基本上=0,cas就会成功,进而拿到锁。整个过程终于结束,总结来看整个lock方法总结在如下:
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
返回值是interrupted,可能为true,也可能false。固其返回值,并不能代表锁的获取状态,这点要注意。
网友评论