ReentrantLock 它是一个支持公平和非公平的可重入的锁。这些特性都是怎么实现的呢?如下代码:
public class ReentrantLockDemo {
private static ReentrantLock lock = new ReentrantLock();
private static volatile int num = 0;
public static void inc() {
lock.lock();
try {
num+=1;
}catch (Exception e){
e.printStackTrace();
}
finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
for (int i = 0;i<1000;i++){
new Thread(()->
inc()
).start();
}
// 不休眠 主线程 执行完 可能 其他线程 还未执行完 会造成结果和预想结果不一致
Thread.sleep(1000);
System.out.println(num);
}
}
以上代码实现了在多线程情况下对一个数的累加。通过以上代码可知当 线程去执行
num+=1;
时 需要先获得锁,操作结束后 再释放锁。那获得锁的操作时如何实现的呢?代码如下:
public void lock() {
sync.lock();
}
以上代码可知,最终调用的是
sync.lock();
,那 sync 是什么呢?继续查找代码:
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
abstract void lock();
Sync 是 ReentrantLock 的一个内部类 并且 继承了 AbstractQueuedSynchronizer 重写了 父类的一些方法,但是还是没找到 lock() 的实现,继续在源码中查找,发现有两种实现:
非公平锁的实现:
// 非公平锁的实现:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
公平锁的实现:
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
那示例代码调用的是哪一个实现呢?经过代码的分析,发现 原来在构造 锁对象的时候 可以通过 构造参数 来指定锁的公平性,示例代码调用的是 无参构造:代码如下:
// 无参构造
public ReentrantLock() {
sync = new NonfairSync();
}
// 有参构造
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
通过以上代码可知,我所创建的对象是一个非公平锁,也就是说 sync.lock() 调用的是 非公平锁的实现,即:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
通过以上代码分析,可知 线程会先通过 CAS 去执行
compareAndSetState(0,1)
如果结果为 true 则说明当前线程可以 获得锁,并 执行setExclusiveOwnerThread(Thread.currentThread());
通过方法名可知:设置一个独占锁 所属线程。也就说 当前线程获得了一个独占锁。代码如下:
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
以上代码 是Sync父类 AbstractOwnableSynchronizer 的一个成员变量 主要用于保存一个获得独占锁的线程。获得锁的线程 就可以继续执行run() 方法了。当线程执行完毕 就 调用 unlock();
public void unlock() {
sync.release(1);
}
public void unlock() {
sync.release(1);
}
// 父类的默认实现,子类并没有重写
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
通过以上代码发现,调用的是父类的实现。示例代码 是创建了1000个线程,并操作同一个方法。每个线程都要去获得锁,那些没有或的锁的线程怎么处理呢?通过以上代码发现,没有获得锁的线程会执行
acquire(1);
方法,找到对应的实现如下(子类并没有覆写父类方法 所以会调用父类的实现):
// 父类的实现
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上代码 可知 会先判断 尝试获得锁是否成功,并且添加一个独占的Node 到队列中是否成功,如果入队成功 并且尝试获得锁失败,则会继续执行。尝试获得锁的实现如下(子类重写了父类的实现):
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
// 获取当线程
final Thread current = Thread.currentThread();
// 获取当前状态
int c = getState();
// 如果当前状态 等于0,
f (c == 0) {
// 并通过 CAS 来判断 能否获得锁
// 如果 CAS 返回true ,则说明可以获得锁,并保存当前线程,返回 true,
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判断当前线程 是否和缓存的线程为同一个线程,如果为同一个线程
else if (current == getExclusiveOwnerThread()) {
// 则下一个状态 为 当前 state + 1;
int nextc = c + acquires;
// 如果 nextc < 0 抛异常
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 否则设置 状态 为 nextc = c + acquires
setState(nextc);
return true;
}
return false;
}
通过以上代码可知 acquires 的值 为1,通过分析 首先获取当线程 然后获取 state 的值, 并判断 state 是否等于0,如果state 等于0,则说明当前线程可以尝试获得锁,如果 CAS 成功,则当前线程获得锁,否则 return,如果当前 state 不等于 0 ,则判断当前线程是否和 已经缓存的线程 是否为同一个线程,如果为同一个线程,则判断 当前状态 加 一并且 判断 是否小雨0,如果不小于0,则设置state 为 nextc 并返回true ,否则 返回false. 由
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
可知 只有
!tryAcquire(arg)
为 true 时 也是 tryAcquire(arg) 为false 时 才能继续执行,那什么请款下 会为false
呢?
通过以上分析可知:
1. state 等于0 并且 CAS 失败 返回false
2. state 不等于0 并且 当前线程和已经获得锁的线程不是同一个线程 返回false.
那acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
什么情况下为true呢?该代码由两个方法组成:
1.Node addWaiter(Node.EXCLUSIVE)
2.boolean acquireQueued(final Node node, int arg)
首先我们先分析
addWaiter
,找到对应的实现如下(调用父类的默认实现):
private Node addWaiter(Node mode) {
// 创建一个Node 并保存当前线程
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
通过分析以上代码:
1.创建一个mode 为EXCLUSIVE
的节点。
2.获取tail``,并
pred引用指向 该节点。 3.判断该引用指向的节点是否为空? 如果不为空,则将新创建的节点
node的前置节点指向
pred,并通过CAS 设置 该节点为
tail节点. 如果设置成功,则将
pred所指向的节点的
next节点 指向新创建的节点。并返回创建的节点
node. 如果设置失败或者
tail为空,则执行 入队操作
enq(node),代码如下:
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
通过分析以上代码,可知通过自旋,首先获取
tail
节点,然后判断 该节点是否存在,如果不存在,则通过CAS 创建一个head
节点,并将tail
节点指向head
节点。如果不为空,则直接将新创建的节点的前置节点指向tail
节点,并通过CAS 设置 该节点为tail
节点,如果设置成功,则将 t 所指向的节点的next引用指向创建的 node 节点,并返回 t. 继续分析
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取该节点的前置节点
final Node p = node.predecessor();
// 如果该节点前置节点 p 为 head 节点 并且 state 等于 0.
// 则 设置该节点前置节点 为 head 节点,并将 next 所指向的节点 设置为null. 则返回 false
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 获取节点的前置节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 设置 head节点
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
网友评论