主要成员
public class ReentrantLock implements Lock, java.io.Serializable {
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}
}
两种实现
static final class NonfairSync extends Sync {
...
}
static final class FairSync extends Sync {
...
}
构造器
// 默认是非公平的实现
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
主要方法
// 都是直接调用成员sync去实现的
public void lock() {
sync.lock();
}
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
public void unlock() {
sync.release(1);
}
AbstractQueuedSynchronizer
在聊Sync之前,必须得聊下AQS,几乎是所有java并发工具的基石了
// AQS的父类,持有一个线程的成员
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
protected AbstractOwnableSynchronizer() { }
// 这个成员指向持有当前AQS的线程
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
protected AbstractQueuedSynchronizer() { }
...
}
等待实现的方法
AQS是个抽象类,但是已经实现了绝大部分的功能,只有部分方法必须由子类来实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
重要成员
AQS内部实现了一个双向链表结构
private transient volatile Node head;
private transient volatile Node tail;
// 这个字段没有在构造器里初始化所以初始化值就是0
private volatile int state;
// 这个node会持有一个线程对象
static final class Node {
...
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
...
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
lock
非公平锁
final void lock() {
// 第一次插队的机会,无视state直接更新
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
会调用到AQS的方法
public final void acquire(int arg) {
// tryAcquire 是由子类实现的
if (!tryAcquire(arg) &&
// && 之后的方法不区分公平和非公平,ReentrantLock都是排他锁 所以是Node.EXCLUSIVE
// addWaiter就是把自己添加到AQS的链表中
// acquireQueued可以理解为开始排队
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 第二次插队的机会,直接使用CAS尝试去更新state,从0更新到1
if (compareAndSetState(0, acquires)) {
// 成功了就是获得锁
setExclusiveOwnerThread(current);
return true;
}
}
// 提前剧透 下面的else if和公平锁没有区别
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果插队失败,当前线程就会成为遵纪守法好公民,尝试进入队列并开始排队
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 最最一开始tail没有初始化 这里是null
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 尝试排在队列末尾,也是用CAS的方式,是原子操作,线程安全的
if (compareAndSetTail(pred, node)) {
// 入队成功,返回
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
// 用死循环保证入队成功
for (;;) {
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
// 初始化,可以看到tail和head指向是一致的
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 前一个节点是head,自己是队首,才能尝试去获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果不是就会根据前一个节点的状态,来决定是否等待
if (shouldParkAfterFailedAcquire(p, node) &&
// 等待
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
公平锁
final void lock() {
acquire(1);
}
直接看公平锁的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 这个hasQueuedPredecessors就是公平和非公平最大的区别了
// 返回false才有资格去获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
关于Node的状态
Node定义了5种状态
如果在代码里看到waitStatus>0的条件,就是判断当前Node是不是取消了
static final class Node {
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 初始化是0
volatile int waitStatus;
}
再回头看刚刚的acquireQueued方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
...
}
} finally {
if (failed)
// 只要在try代码块中有抛出异常,就会进入cancel的流程
cancelAcquire(node);
}
}
}
private void cancelAcquire(Node node) {
if (node == null)
return;
// 和自己的线程解约
node.thread = null;
Node pred = node.prev;
// 一直循环找到状态不是cancel的节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// 将当前节点置为cancel
node.waitStatus = Node.CANCELLED;
// 如果自己就是队尾节点,把上一个节点设置成tail
if (node == tail && compareAndSetTail(node, pred)) {
// 设置成功后把上一个节点的next设置为空
compareAndSetNext(pred, predNext, null);
} else {
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
// 介绍自己的下家给上家认识,自己深藏功与名
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 自己是队头就走unlock的流程
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
unlock
解锁操作是不区分公平和非公平的
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
// 只有c等于0了,才是释放锁
// 这里也牵涉到ReentrantLock是可重入锁,是可以反复上锁的
// 所以对应的就要反复解锁,state这个int字段,就可以理解为上锁的次数
free = true;
// 把AQS中的独占线程清空
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
tryLock
上面看懂后,没有参数的tryLock就特别简单,因为就是非公平锁的实现,并且如果没有获取成功,也不会入队
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
带参数的tryLock
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
// 死循环直到获取锁或者超时
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
// 超时
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 用这个LockSupport等待指定的时间
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
// 因为failed默认是true,所以不管是超时还是抛异常,最终都会走到cancel流程
cancelAcquire(node);
}
}
网友评论