简述
记录下自己的学习过程
抽象方法(模板模式)
子类需要按照自己的需求,去重写一些抽象方法;
- tryAcquire
- tryRelease
- tryAcquireShared
- tryReleaseShared
- isHeldExclusively
CLH队列
- 是一个虚拟的队列,没有实际的数组,主要是通过节点之间的指针联系
- 当获取不到锁的时候,进入CLH,等待满足获取资源成功;
- 获取成功的,成为头节
- 释放锁成功的时候,再唤醒其最近的非取消态的后继节点,其再自旋尝试竞争锁,成功成为新的头节点,并移除原头节点及被取消的节点。
入队方法
入队主要从队尾
private Node enq(final Node node) {
// CAS自旋
for (;;) {
Node t = tail;
// 判断队列是否为空
if (t == null) { // Must initialize
// 根据headOffset偏移量CAS设置头节点
if (compareAndSetHead(new Node()))
// 尾节点指向头节点
tail = head;
// 此时node没有放入队尾,只有等下一次循环
} else {
// 当前节点前指针指向原尾节点
node.prev = t;
// 根据tailOffset偏移量CAS设置头节点
if (compareAndSetTail(t, node)) {
// 原尾节点后指针指向新节点
t.next = node;
return t;
}
}
}
}
新线程入队
主要就是新建一个节点,然后放到队尾
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 独占锁,共享锁
* @return the new node
*/
private Node addWaiter(Node mode) {
// 将当前线程放入新节点,然后nextWaiter=mode
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 新节点的前指针指向原尾节点,新节点成为新的尾节点
node.prev = pred;
// 根据tailOffset偏移量CAS设置尾节点
if (compareAndSetTail(pred, node)) {
// 原尾节点的后指针指向新节点
pred.next = node;
return node;
}
}
// CAS自旋,放入CLH队列
enq(node);
return node;
}
获取独占锁
- acquire
1.首先会尝试tryAcquire获取锁,如果返回true,则能获取到,!tryAcquire(arg) &&短路与,则直接执行业务代码;
2.如果获取不到,则执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
3.如果acquireQueued成功,则执行selfInterrupt()
4.此时,只有头节点是park的,其他的都是unpark的,且除了tail指向的尾节点,其他的waitStatus都是SIGNAL。
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
// 调用子类实现的获取锁方法,成功则获取锁成功。
if (!tryAcquire(arg) &&
// 失败,则入队,且自旋获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 重新设置线程中断状态
selfInterrupt();
}
- tryAcquire
AQS抽象类,自己不实现此方法,交由相应的子类去自行实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
1.ReentrantLock.FairSync.tryAcquire
2.ReentrantLock.NonfairSync.tryAcquire
3.ReentrantReadWriteLock.Sync.tryAcquire
4.ThreadPoolExecutor.Worker.tryAcquire
- acquireQueued
1.ReentrantLock.FairSync.tryAcquire
2.ReentrantLock.NonfairSync.tryAcquire
3.ReentrantReadWriteLock.Sync.tryAcquire
4.ThreadPoolExecutor.Worker.tryAcquire
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋
for (;;) {
// CLH采用前一个节点来标记状态,可能是考虑到当前结点可以取消之类的因素
final Node p = node.predecessor();
// 如果前一个节点是头结点,则当前节点才能尝试获取锁
if (p == head && tryAcquire(arg)) {
// 如果获取成功,则设置当前节点为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果未获取到锁,则需要判断是否要挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
// 如果需要挂起线程,则调用LockSuport.park挂起线程
parkAndCheckInterrupt())
// 如果走到这一步,代表调用了unpark,并且线程中断状态被清除为false,此处应该加个标记,让acquireQueue调用selfInterrupt设置true回去
interrupted = true;
}
} finally {
// 如果因为其他原因导失败,则需要取消当前节点
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire
确定是否需要挂起当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前一个节点状态为SIGNAL,则代表当前节点已经被挂起了,则直接返回
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 如果前一个节点已经被取消,则需要往前找一个非取消的节点,并移除已经取消的那些节点
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
// 将前一个节点设置为SIGNAL
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
// 返回false,表明当前线程不可以挂起,需要继续自旋
return false;
}
/**
* Convenience method to interrupt current thread.
*/
static void selfInterrupt() {
// 给当前线程设置中断标志
Thread.currentThread().interrupt();
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
// 挂起当前线程
LockSupport.park(this);
// 阻塞处,等待unpark,返回线程状态,并且清除中断状态
return Thread.interrupted();
}
release
释放锁
public final boolean release(int arg) {
// 调用子类释放锁,成功则唤醒后继节点
if (tryRelease(arg)) {
Node h = head;
/*
* 1.如果头节点为空,则代表没有线程竞争
* 2.如果头节点非空,且waitStatus为初始状态0,则不需要唤醒,
* 因为后继节点在shouldParkAfterFailedAcquire里面会二次重试,不会被挂起
*/
if (h != null && h.waitStatus != 0)
// 唤醒后继节点,acquireQueued继续执行。
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor
释放锁后,此时后继的都是park的,只有头节点是unpark,唤醒后继非取消的节点,来竞争锁。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 唤醒头节点的后继节点
Node s = node.next;
// enq或者addWaiter过程中的时候,旧的next未指向新节点:s==null
// 移除cancel节点的过程中,中间节点指向被移除的cancel节点:s.waitStatus
if (s == null || s.waitStatus > 0) {
s = null;
// 找出离node最近的那一个未取消的后继节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒阻塞的线程
if (s != null)
LockSupport.unpark(s.thread);
}
网友评论