概述
以下讨论基于JDK1.8
ReentrantLock主要利用CAS+AQS队列来实现。它支持公平锁和非公平锁,两者的实现类似。
CAS:Compare and Swap,比较并交换。CAS主要是由sun.misc.Unsafe这个类通过JNI调用CPU底层指令实现。
AQS:AbstractQueuedSynchronizer,它是一个用于构建锁和同步容器的框架。事实上concurrent包内许多类都是基于AQS构建,例如ReentrantLock,Semaphore,CountDownLatch,ReentrantReadWriteLock,FutureTask等。AQS解决了在实现同步容器时设计的大量细节问题。
AQS使用一个FIFO的队列表示排队等待锁的线程,基于这个队列ReentrantLock的基本实现原理可以概括为:先通过CAS尝试获得锁,如果此时已经有线程占据锁,那就加入队列的队尾且被挂起。当锁被释放之后,队列中第一个线程被唤起,然后通过CAS尝试获取锁。在这个时候,如果:
非公平锁:如果同时还有另外一个新线程进来,它可以不用排队直接来争抢锁,此时可能会被这个线程抢到锁。
公平锁:如果同时还有另外一个新线程进来,当它发现已经有人在队列中排队时,此线程就会排到队尾,由队首的线程获取锁。
所以非公平锁效率要高于公平锁,因为非公平锁减少了线程挂起唤醒的时间。
类继承关系
ReentrantLock.pngReentrantLock特点
* 可重入
* 可中断
* 公平锁/非公平锁
* 使用CAS
* 相对于synchronized更为灵活
ReentrantLock构造方法
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
NonfairSync:非公平锁
FairSync:公平锁
NonfairSync
一. lock()
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
使用CAS操作判断state是否为0(0表示锁未被占用),如果是0则设置为1,并把当前线程设置为该锁的独占线程,即当前线程获得锁。
“非公平”即体现在这里,如果占用的线程释放锁,state置为0,而排队的线程还未被唤醒时,新来的线程可能直接抢占了该锁,那么就是插队成功。
假设当前有三个线程来竞争锁,A线程CAS操作成功获,拿到了锁,那么B C线程设置state失败,接下来会进入acquire方法:
二. acquire()
public final void acquire(int arg) {
//尝试获取锁
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取state状态
int c = getState();
if (c == 0) {//当前没有线程占用
//使用CAS来这是state状态
if (compareAndSetState(0, acquires)) {
//占用锁成功,设置当前线程为独享
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//如果当前线程已经占用锁
//将state + 1 ,即锁重入
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
//更新state,记录重入次数
setState(nextc);
return true;
}
//获取锁失败
return false;
}
总结:非公平锁tryAcquire流程:检查state为0未被占用,那么尝试占用,若不为0,检查当前锁是否被自己占用,若是被自己占用,增更新state记录重入次数。如果都没有成功,则获取锁失败,返回false。
三. addWaiter()
当前A线程已经获得锁,BC线程尝试获取锁失败后,就会进入等待队列,如果A线程长时间持有锁,BC线程会被挂起。
private Node addWaiter(Node mode) {
//初始化节点,设置关联线程,模式为独享(mode可参考Node中各个模式)
Node node = new Node(Thread.currentThread(), mode);
//获取尾结点
Node pred = tail;
//如果尾结点不为空,说明队列已经存在
if (pred != null) {
node.prev = pred;
//将新节点设置为尾结点 CAS方式
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//尾结点为空,说明队列还没有初始化,需要初始化head节点并且入队
enq(node);
return node;
}
BC线程同时尝试入队,由于队列尚未初始化,所以至少有一个线程会走到enq(node)
//初始化队列,并将新节点入队
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//如果队列为空,新建一个节点设置为head节点,并且将tail尾结点指向head
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//如果队列不为空,将新节点入队
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
BC线程进入enq方法,此时队列为空,那么只有一个线程通过compareAndSetHead创建head节点成功,然后开始第二轮循环,第二轮循环此时tail节点已经不为空,两个线程会进入else,假设B线程通过compareAndSetTail成功,那么B线程入队并返回,C由于入队失败需要第三轮循环。最终所有线程入队成功。当BC线程入队后,此时AQS的队列如下:
AQS入队.jpg
四.acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取前置节点
final Node p = node.predecessor();
//如果前置节点是head,则当前节点在第二位,此时该节点可以尝试获取锁
if (p == head && tryAcquire(arg)) {
//当前节点获取锁成功,并将其设置为head节点
setHead(node);
//原head节点出队,等待被GC
p.next = null; // help GC
//获取锁成功
failed = false;
//返回线程是否被中断过
return interrupted;
}
//判断线程获取锁失败后是否可以挂起?若可以才挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//设置线程被中断标志位为true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
BC线程在获取锁时,如果A线程一直占用锁,则BC线程通过tryAcquire获取锁都会失败,因为会进入第二个if语句,判断线程是否可以挂起。
五.shouldParkAfterFailedAcquire()
这个方法的主要作用为判断当前线程获取锁失败后是否需要挂起。
查看这个方法时需要先了解Node中几个字段意义:
字段 | 类型 | 默认值 | 描述 |
---|---|---|---|
SHARED | Node | new Node() | 节点为共享模式 |
EXCLUSIVE | Node | null | 节点为独占(排它)模式 |
CANCELLED | int | 1 | 节点超时或被中断时设置为取消状态,用来设置waitStatus |
SIGNAL | int | -1 | 当前节点的后节点被park,当前节点释放时,必须调用unpark通知后面的节点,当后面的节点竞争时,会将前面的节点更新为SIGNAL,用来设置waitStatus |
CONDITION | int | -2 | 标识当前节点处于等待中(通过条件进行等待的状态),用来设置waitStatus |
PROPAGATE | int | -3 | 共享模式下释放节点时设置的状态,被标记为当前状态是表示无限传播下去,用来设置waitStatus |
waitStatus | volatile int | 0 | 等待状态,默认0,标识正常同步等待 |
prev | volatile Node | null | 队列中的上一个节点 |
next | volatile Node | null | 队列中的下一个节点 |
thread | volatile Thread | null | 当前节点所对应的线程 |
nextWaiter | Node | null | 指向下一个处于阻塞的节点 |
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前置节点的状态
int ws = pred.waitStatus;
//如果前置节点状态为SIGNAL,则返回true,当前线程可挂起
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//大于0肯定是取消状态,不会通知
//就一直往前找,找到一个能通知自己的,挂在那个节点后面,方便自己休眠有人唤醒自己
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//将前置节点的状态设置为SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
//前置节点不是SIGNAL状态返回false
//下次在进入该方法要么前置节点是SIGNAL状态,要么就是上面的CAS失败了,重新CAS更新状态后才返回true
return false;
}
//通过LockSupport.park(this)挂起当前线程,重置返回当前线程中断状态
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
线程入队后是否挂起的前提是,它的前置节点状态为SIGNAL,这要能保证前置节点获取锁后能唤醒自己。所以shouldParkAfterFailedAcquire先判断当前节点的前置节点是否是SIGNAL状态,若符合返回true,然后调用parkAndCheckInterrupt将自己挂起。如果前置节点状态不符合,再看前置节点是否为CANCELLED,若是就向前遍历知道找到第一个符合要求的前置节点,最后的else是通过CAS的方式将前置节点状态设置为SIGNAL。
此时Node队列的状态可能如下:
AQS队列节点挂起状态.jpg
六. unlock()
public void unlock() {
//每次unlock将state递减1
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
//释放锁完成后,获取head节点,并head节点waitStatus不为0
if (h != null && h.waitStatus != 0)
//唤醒下一个线程
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
//获取当前线程state数量,并将其减1
int c = getState() - releases;
//如果当前线程不是独占线程则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果-1之后state为0
if (c == 0) {
//free设置为true
free = true;
//清空当前的独占线程,此时已无线程使用
setExclusiveOwnerThread(null);
}
//如果之前的锁有重入,则重新记录重入次数
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
//如果节点状态<0,是否为SIGNAL状态
//通过CAS重置状态
compareAndSetWaitStatus(node, ws, 0);
//获取下一个节点
Node s = node.next;
//如果下一个节点为空,或者状态为取消状态
if (s == null || s.waitStatus > 0) {
s = null;
//从尾结点开始遍历
for (Node t = tail; t != null && t != node; t = t.prev)
//找到最前面且状态为SIGNAL的节点
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒这个节点
LockSupport.unpark(s.thread);
}
总结:流程为先尝试解锁,若解锁成功,那么查看Head节点状态是否为SIGNAL,如果是则唤醒头结点的下一个节点关联的线程,如果释放失败则返回false表示解锁失败。所以每次解锁成功后唤醒的都是Head节点的下一个节点关联的线程。
FairSync
公平锁的上锁过程与非公平锁存在一些不同点,但整体流程大致是一样的,下面介绍一下区别:
一. lock()
final void lock() {
acquire(1);
}
相对于非公平锁来说lock方法直接调用了acquire()方法,而非公平锁新进来的线程有机会通过compareAndSetState抢占说,这个是公平与非公平的体现。
二.tryAcquire()
tryAcquire相对于非公平锁来说多了一个hasQueuedPredecessors方法,这个方法的意思是,当前队列如果已经有线程在排队了,则当前线程也需要去排队。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//如果当前队列已经有线程排队,hasQueuedPredecessors返回true,则当前线程tryAcquire失败,进入后续排队代码
//如果hasQueuedPredecessors返回false,则当前线程通过CAS方式尝试获取锁,如果成功则返回true,并设置当前线程为独占线程
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
网友评论