AQS源码分析(AbstractQueuedSynchronizer)
AQS是一个抽象类,是一个队列同步器。ReentrantLock中的同步实现,其实就是通过AQS来实现的,是一个显示的同步器。
1.AQS中的属性state
在AQS中有一个很重要的属性state,这是一个volatile修饰的int变量。表示的是同步状态,即当前锁是否是被使用。
一般使用AQS,都是继承AQS抽象类,根据不同的锁需要实现不同的逻辑,填充AQS中未实现的空方法。
AQS本身是采用了模板模式,定义抽象类,然后其内部有许多未实现的空模板。
2.实现一个自定义的类似于ReentrantLock的锁
通过AQS实现同步器,即实现当前线程是否获取锁资源,这是通过AQS中的state属性来操作的。
那么需要实现AQS,并且当拿到锁的时候,需要修改AQS中的state属性。改为1表示就拿到了锁。
自定义ReentrantLock锁,首先需要实现Lock接口,然后定义内部类,继承AQS,实现同步器。一个同步工具,是否是可重入的,其实就是通过对AQS的state做修改,如果是可重入的,那么state可以做加1操作。但是如果是可重入的,每次获取锁,都对state+1操作。不可重入的,可以说是独占锁,可重入的,就是可重入锁。
(1)定义一个不可重入锁
public class SelfLock implements Lock {
// 静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
/*判断处于占用状态*/
@Override
protected boolean isHeldExclusively() {
return getState()==1;
}
/*获得锁*/
@Override
protected boolean tryAcquire(int arg) {
if(compareAndSetState(0,1)){
// 设置排他所有者线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/*释放锁*/
@Override
protected boolean tryRelease(int arg) {
if(getState()==0){
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
//compareAndSetState(1,0);
return true;
}
// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() {
return new ConditionObject();
}
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() {
System.out.println(Thread.currentThread().getName()+" ready get lock");
sync.acquire(1);
System.out.println(Thread.currentThread().getName()+" already got lock");
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
System.out.println(Thread.currentThread().getName()+" ready release lock");
sync.release(1);
System.out.println(Thread.currentThread().getName()+" already released lock");
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
(2)定义一个可重入锁
public class ReenterSelfLock implements Lock {
// 静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于占用状态
protected boolean isHeldExclusively() {
return getState() > 0;
}
// 当状态为0的时候获取锁
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}else if(getExclusiveOwnerThread()==Thread.currentThread()){
// 如果当前的排他所有线程依然是当前线程,则可以做可重入,即state+1
setState(getState()+1);
return true;
}
return false;
}
// 释放锁,将状态设置为0
protected boolean tryRelease(int releases) {
if(getExclusiveOwnerThread()!=Thread.currentThread()){
throw new IllegalMonitorStateException();
}
if (getState() == 0)
throw new IllegalMonitorStateException();
setState(getState()-1);
if(getState()==0){
setExclusiveOwnerThread(null);
}
return true;
}
// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() {
return new ConditionObject();
}
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() {
System.out.println(Thread.currentThread().getName()+" ready get lock");
sync.acquire(1);
System.out.println(Thread.currentThread().getName()+" already got lock");
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
System.out.println(Thread.currentThread().getName()+" ready release lock");
sync.release(1);
System.out.println(Thread.currentThread().getName()+" already released lock");
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
(3)总结锁的实现
在使用AQS定义同步器的时候,锁的是否可重入,其实也是通过其state属性来判断的,如果当前尝试获取锁的线程是之前获取锁的所有者线程,那么就给state+1,再赋值。并且判断是否占用锁状态的时候,也使用getState()>0来判断;而如果是不可重入锁,则只有state=1的时候才是有线程占有锁,如果等于0的时候,就没有线程占有锁。
3.AQS的基本思想-CLH队列锁
凡是没有获取锁的线程都要进行排队,而把要排队的线程打包成一个QNode队列,在QNode中有一个locked布尔值,为true表示需要获取锁。当一个线程获取到锁的时候,其他排队的线程想要获取锁,就会阻塞等待,当使用锁的线程已经释放锁之后,这些排队的线程就开始竞争锁,竞争到锁资源的线程就会被唤醒,变成运行状态中的就绪状态,然后开始进入执行状态。当一个线程需要获取锁的时候,就会将自己放在CLH队列的末尾。
在CLH队列中,每个QNode节点都在不停的自旋,都在监测自己的前驱节点是否已经释放锁,即前一个节点的locked是否已经变为false,如果已经变成了false,说明当前节点的前一个节点已经释放了锁,那么当前节点就可以获取到锁资源。CLH自旋获取锁,其实是一种公平锁的实现,即只有队列的前一个先获取锁。
AQS可以称为CLH队列的变种实现,AQS采用的是双向队列,并且AQS一般只会自旋一定次数(一般是两到三次),当达到自旋次数以后仍然没获取到锁,就会进入阻塞状态。
在AQS中,除了有CLH队列自旋获取锁以外,还有一个等待队列,等待队列是线程在获取锁的时候自旋一定次数之后还没获取到的时候,就会调用wait进入等待队列,然后会被notify唤醒,唤醒之后又进入QNode队列的末尾去自旋拿锁。
网友评论