背景
通过lock
与unlock
研究ReentrantLock相关实现的原理 :
- 通过
state
变量标志当前资源占用 -
ExclusiveThread
标志当前占用CPU的线程 -
LockSupport.park/unpark
来申请和释放CPU资源 -
Unsafe.compareAndSwapObject
来完成CAS操作 -
Unsafe.putObject
来完成原子操作
重要变量
-
state
变量- 为0 : 代表该节点为初创节点 , 什么都没干
- 大于0 : 即代表CANCELLED , 也就是所有大于0的节点都是被取消的节点
- 小于0 : SIGNAL = -1 : 代表该节点等待unpark
CONDITION = -2 : 代表该节点等待condition的触发
PROPAGATE = -3 : 主要用于共享锁 , 代表该节点需要无条件传递上一个节点
lock流程简述
- 通过CAS设置
state
(0-->1) , 如果设置成功 , 则独占该锁 - 设置失败 , 通过
addWaiter
创建本线程的Node , 插入队列中 - 在添加完节点后 , 如果当前线程未抢占成功 , 则会遍历删除Cancel节点
- 最后会找到可以执行的Node , 开始通过
park
来让出当前线程的CPU - 公平锁会在申请锁的时候判断队列后继节点是否存在 , 如果当前节点不是该后继节点的话 , 就不会申请锁.
unlock流程简述
- 尝试释放相应的资源
- 释放成功的话 , 则会把独占线程让出
- 会遍历Node队列 , 从头节点开始寻找可以unpark的线程
非公平锁的加锁与释放
lock的实现
- 当调用到
ReentrantLock.lock()
时
// 以非公平锁为例
static final class NonfairSync extends Sync {
// 获取锁
final void lock() {
// 通过CAS操作设置state的状态
if (compareAndSetState(0, 1))
// 如果设置成功的话 , 则代表当前独占锁的线程为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 尝试获取锁
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
// 重写子类的Acquire , 调用nonfairTryAcquire
return nonfairTryAcquire(acquires);
}
}
- 在
acquire
中 , 会获取锁 , 或者插入等待队列中
public final void acquire(int arg) {
// tryAcquire尝试获取锁 , 如果获取失败的话 ,
// 则通过acquireQueued将当前线程添加到队列中
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果成功的话 , 则中断当前线程 , 仅仅只是中断
selfInterrupt();
}
- 在Sync重写的
tryAcquire
中 , 会尝试获取锁
- 如果当前锁没有被占用(即state=0)时 , 则独占该锁
- 如果当前独占线程是本线程的话 , 则将State递增
- 否则 , 则返回false , 将本线程插入队列中
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取当前State状态
int c = getState();
if (c == 0) {
// 如果当前State为0的话 , 则独占该锁 ,并且返回true,不需要插入队列
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
// 如果当前线程就是独占的线程的话 , 则将State加一 , 并且返回true
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- 通过
addWaiter(Node.EXCLUSIVE)
创建Node , 由于是通过Unsafe.putObject
, 所以也是保证线程间的可见性的.
private Node addWaiter(Node mode) {
// 通过构造函数 , 将Node.EXCLUSIVE作为nextWaiter放入该节点
Node node = new Node(mode);
for (;;) {
// 找到尾节点 , head/tail都是volatile的
Node oldTail = tail;
// 尾节点不为空 , 双线链表增加尾节点操作
if (oldTail != null) {
// 将tail节点放到node.prev中
U.putObject(node, Node.PREV, oldTail);
// 通过cas操作 , 设置tail
if (compareAndSetTail(oldTail, node)) {
// 如果设置成功的话 , 则将tail.next设置成该节点
oldTail.next = node;
// 返回node
return node;
}
} else {
// 如果没有头尾节点 , 则创建head/tail节点
initializeSyncQueue();
}
}
}
- 在添加完节点后 , 开始在队列中找寻要唤醒的线程节点.
- 如果
shouldParkAfterFailedAcquire
返回true , 则会Park当前线程 - 如果
shouldParkAfterFailedAcquire
返回false , 就会一直自旋遍历前驱节点 , 直到前驱接节点为空 , 则将当前节点设置为head节点 , 可以获取当前锁
final boolean acquireQueued(final Node node, int arg) {
// 其实node就是尾节点
try {
boolean interrupted = false;
for (;;) {
// 从尾节点往前遍历 , 获取其前驱
final Node p = node.predecessor();
// 如果遍历到了head节点 , 并且获取成功了, 则返回false
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
// shouldParkAfterFailedAcquire:判断在acquire后,是否应该park让出CPU
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
-
shouldParkAfterFailedAcquire
主要根据前驱节点状态判断当前节点是否需要Park让出CPU
- 如果前驱节点的ws为
Node.SIGNAL
的话 , 则返回true , park当前线程 - 如果前驱节点的ws为canceld的话 , 则从队列中去除掉canceld节点
- 如果前驱节点的ws为0或者
PROPAGATE
的话 , 则将前驱节点的ws设置成SIGNAL
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前驱节点的waitStatus为SIGNAL,就代表前驱节点可以被唤醒
// 返回true , park当前线程
return true;
if (ws > 0) {
// 如果ws>0, 则代表前驱节点cancelled了 , 则过滤掉canceld节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 删除掉cancedld节点
pred.next = node;
} else {
// 如果ws为0或者PROPAGATE的话 , 则将前驱节点的waitStatus设置成SIGNAL , 但是先不park
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
- 如果前驱节点要唤醒的话 , 则会Park当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
unlock的实现
当某个线程在使用完锁后 , 使用unlock
来释放锁资源.
- 调用
unlock
也就是释放1
的资源
public void unlock() {
sync.release(1);
}
- 在AQS中的
release
会先尝试释放锁 , 如果成功 ,则unpark
前驱线程
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
- 在
Sync
中的tryRelease
中 , 会释放相关资源 , 并且返回锁是否释放成功
protected final boolean tryRelease(int releases) {
// 将当前申请资源数 - 要释放的数量
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
// 如果当前线程不是独占线程的话 , 则抛出异常
throw new IllegalMonitorStateException();
// 是否释放锁
boolean free = false;
if (c == 0) {
// 如果当前没有资源占用的话 , 则认为可以释放锁
free = true;
// 将独占线程设为空
setExclusiveOwnerThread(null);
}
// 设置当前锁所对应的资源数量
setState(c);
// 返回当前锁是否会被释放
return free;
- 当锁成功释放后 , 会判断前驱节点的
waitStatus
是否不为0 , 由于lock
阶段会清除所有的Cancel
节点 , 所以此处判断不为0 , 则代表前驱节点可以获取资源占有锁. 于是 , 调用unparkSuccessor
让前驱节点获取节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 如果waitStatus小于0 ,
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
// 开始寻找可执行的前驱节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
// unpark对应的线程
LockSupport.unpark(s.thread);
}
公平锁的加锁与释放
对于公平锁(FairSync
)而言在加锁的过程中会有所不同 , 仅仅只是在申请锁的时候 , 加入了队列的判断 , 如果头节点有后继节点的话 , 则让后继节点获取CPU
- 在申请锁的时候 , 会判断队列是否有后继节点 , 如果有后继节点的话 , 则让后继节点优先获得CPU
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 判断队列中是否有后继节点
if (!hasQueuedPredecessors() &&
// 如果没有前驱的话 , 则会通过cas来设置state
compareAndSetState(0, acquires)) {
// 设置当前线程独占该锁
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- 判断head/tail节点是否相同 , 如果不同的话 , 代表head节点仍有后继节点
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 返回头尾节点不相等 , 并且头节点的next不为空 , 头节点的线程不是当前线程
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
网友评论