显式锁Lock是JDK1.5开始引入的排他锁,作用跟内部锁一致,只是提供了一些内部锁所不具备的特性。
ReentrantLock是Lock的默认实现类。
在此之前我们过了一遍ConditionObject的源码,里面提到condition queue和sync queue,其中sync queue就是加锁失败之后线程进入的queue,这里我们接着分析ReentrantLock的lock和unlock过程。
ReentrantLock源码
public class ReentrantLock implements Lock, java.io.Serializable {
//定义一个final的Sync成员变量,就是AQS的实现类
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
//非公平申请锁方法,提供给NonFair实现以及tryLock()方法实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//这里跟Fair的区别,不看queue里是不是有前继节点在等待,而直接尝试CAS更新state
if (compareAndSetState(0, acquires)) {
//尝试更新state成功即获得锁,设置当前线程未独占锁Owner
setExclusiveOwnerThread(current);
return true;
}
}
//如果c!=0表示已经有锁了,就看持有锁的线程是不是当前线程
else if (current == getExclusiveOwnerThread()) {
//如果是当前线程持有锁,就再更新state,进行一此重入行为
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//如果不是当前线程持有锁,则表示加锁失败
return false;
}
//尝试释放独占锁
protected final boolean tryRelease(int releases) {
//拿到state减去需要释放锁的数量,得到还持有多少锁
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
//如果release所有持有的锁,那么c==0
free = true;
setExclusiveOwnerThread(null);
}
//更新释放后的state
setState(c);
return free;
}
//是不是当前线程持有独占锁
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
//返回一个新的condition队列
final ConditionObject newCondition() {
return new ConditionObject();
}
//返回独占锁的持有线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
//返回当前线程持有的可重入锁的数量state
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
//返回是不是被锁住的
final boolean isLocked() {
return getState() != 0;
}
}
/**
* non-fair Sync 实现
*/
static final class NonfairSync extends Sync {
//非公平锁加锁,
final void lock() {
//如果当前state是0,且CAS更新state=1成功,表示lock成功
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//如果更新state失败,表示没有获取到锁,则进入尝试加锁流程
acquire(1);
}
//非公平锁,尝试获取锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* fair Sync object
*/
static final class FairSync extends Sync {
//公平锁lock的实现,调用tryAcquire尝试去获取锁,
final void lock() {
acquire(1);
}
//公平锁尝试获取锁,如果state=0,可以加锁的时候,
// hasQueuedPredecessors()查看是不是有前继节点,如果没有,再开始CAS设置state
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
/**
* 默认的实现是非公平锁
*/
public ReentrantLock() {
sync = new NonfairSync();
}
//可以设置是公平锁还是非公平锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//不响应中断的lock
public void lock() {
sync.lock();
}
//能够响应中断的lock
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//通过AQS来的nonFairAcquire来尝试获取锁,
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
//带超时的tryLock,这个是很实用的方法,如果获取锁失败不会进入sync queue等待,而是返回false,业务逻辑可以处理失败的情况
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
//通过AQS来释放锁
public void unlock() {
sync.release(1);
}
}
上面是ReentrantLock的部分源码,主要是Fair和NonFair锁的实现
执行逻辑:
- 尝试获取对象的锁,如果获取不到(以及有其他线程获得了锁,并且没有释放),那么它就会进入到AQS的sync queue中。
- 如果获取到,那么根据锁是公平锁Fair还是非公平锁NonFair
1)如果是公平锁,那么线程会直接放到sync queue的末尾
2)如果是非公平锁,那么线程会首先尝试CAS获取锁,如果失败,则与公平锁的处理方式一致,放到sync queue末尾。 - 当锁被释放时(unlock),那么底层会调用release方法对state成员变量进行减一,减一之后,如果state不为0,那么release操作执行完毕;如果state为0,则调用LockSupport的unpark方法唤醒该线程后的等待队列中的第一个后继线程(pthread_mutex_unlock),将其唤醒,是其能够获得对象的锁(release时,公平锁和非公平锁的处理逻辑是一致的)
ReentrantLock所谓的上锁,本质就是AQS的state成员变量的操作,state+1表示上锁,state-1表示释放锁(区别与Synchronized使用对象头)
资源调度策略:
公平:从排队角度看,公平策略不允许插队,每次来争抢资源时会先判断sync queue是不是有等待的线程,即是不是有前继节点,如果有,则直接加入sync queue的末端。
非公平:允许插队,即资源持有者释放线程,sync queue中会有一个线程有机会被选中唤醒来申请资源,但是这个过程中有另一个线程也开始来抢占这个资源,这个新线程不会去看是不是等待队列有在等待的线程,而是直接参与申请资源,申请成功则获得资源,而等待队列的那个线程继续等待;申请失败则新线程会加入等待队列。非公平策略吞吐量比较高,缺点是极限情况下,会出现饥饿现象,即等待队列里的线程可能永远无法获得所需资源。
可中断的lockInterruptibly
acquireInterruptibly()方法开始的时候会判断线程是否中断。
doAcquireInterruptibly()的逻辑与lock()方法调用的acquireQueued()类似,只是acquireQueued不响应中断,而是通过返回值表示中断与否;而doAcquireInterruptibly内部会抛出InterruptedException。
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
lock和condition的排队与插队
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(()->{
lock.lock();
try{
condition.await();
System.out.println(Thread.currentThread().getName()+"--invoke");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "Thread1").start();
Thread.sleep(100);
new Thread(()->{
lock.lock();
try{
condition.await();
System.out.println(Thread.currentThread().getName()+"--invoke");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "Thread2").start();
Thread.sleep(100);
new Thread(()->{
lock.lock();
try{
condition.await();
System.out.println(Thread.currentThread().getName()+"--invoke");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "Thread3").start();
Thread.sleep(100);
new Thread(()->{
lock.lock();
try{
Thread.sleep(50000);
System.out.println(Thread.currentThread().getName()+"-------signal");
condition.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "Thread4").start();
Thread.sleep(100);
new Thread(()->{
lock.lock();
try{
System.out.println(Thread.currentThread().getName()+"-------get lock");
Thread.sleep(10000);
System.out.println(Thread.currentThread().getName()+"-------finish");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "Thread5").start();
Thread.sleep(100);
new Thread(()->{
lock.lock();
try{
System.out.println(Thread.currentThread().getName()+"-------get lock");
Thread.sleep(10000);
System.out.println(Thread.currentThread().getName()+"-------finish");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "Thread6").start();
---------------------
Thread4-------signal
Thread5-------get lock
Thread5-------finish
Thread6-------get lock
Thread6-------finish
Thread1--invoke
Thread2--invoke
Thread3--invoke
上面示例里面,Thread1, Thread2, Thread3都在await,进入condition queue,Thread4会持有比较长时间锁,所以Thread5, Thread6会进入sync queue,那么Thread4在持有锁结束之后进行singalAll,唤醒所有conditionQueue的等待线程,并且释放锁,会唤醒syncQueue里head的后继节点。signalAll方法会将Thread1, Thread2, Thread3从condition queue移到sync queue,等待
主要的逻辑还是要看AQS部分源码分析
网友评论