AbstractQueuedSynchronizer
学习AQS的必要性
队列同步器AbstractQueuedSynchronizer(以下简称同步器或AQS),是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
AQS原理
内部的一个关键的成员变量state,专门用来保存当前的同步状态,如果需要我们自己去实现一个同步的工具类,它里面采用的是模板设计模式。AQS内部本质上是CLH队列锁,每一个等待拿锁的线程,打包成一个节点,挂到一个链表队列上,所有想要拿锁的线程都在这个链表上一一的挂起来,每个线程会去检测它前一个线程是否有释放锁,如果释放了,就去拿锁。为了防止进入自旋状态,占用cpu,一般试个2-3次,就进入阻塞状态。
/**
* The synchronization state.
*/
private volatile int state;
AQS使用方式和其中的设计模式
AQS的主要使用方式是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态,在AQS里由一个int型的state来代表这个状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。
在实现上,子类推荐被定义为自定义同步组件的静态内部类,AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。
同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器。可以这样理解二者之间的关系:
锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;
同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
实现者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
模板方法模式
同步器的设计基于模板方法模式。模板方法模式的意图是,定义一个操作中的算法的骨架,而将一些步骤的实现延迟到子类中。模板方法使得子类可以不改变一个算法的结构即可重定义该算法的某些特定步骤。我们最常见的就是Spring框架里的各种Template。
AQS中的方法
模板方法
实现自定义同步组件时,将会调用同步器提供的模板方法,
模板方法.png这些模板方法同步器提供的模板方法基本上分为3类:独占式获取与释放同步状态、共享式获取与释放、同步状态和查询同步队列中的等待线程情况。
可重写的方法
可重写的方法1.png 可重写的方法2.png访问或修改同步状态的方法
重写同步器指定的方法时,需要使用同步器提供的如下3个方法来访问或修改同步状态。
-
getState():获取当前同步状态。
-
setState(int newState):设置当前同步状态。
-
compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性。
实现我们自己的
public static class MyAQS extends AbstractQueuedSynchronizer {
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {//compareAndSetState(expect, update)
setExclusiveOwnerThread(Thread.currentThread());//设置独有的线程
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) {//如果锁未占用,而调用release(),抛出异常
throw new Exception();
}
setExclusiveOwnerThread(null);//重置独有线程
setState(0);//恢复锁的状态
return true;
}
}
private MyAQS mMyAQS = new MyAQS();
@Override
public void lock() {
mMyAQS.acquire(1);//抢占锁,内部调用了tryAcquire();
}
@Override
public void unlock() {
mMyAQS.release(1);//释放锁,内部调用了tryRelease();
}
CLH队列锁
CLH队列锁即Craig, Landin, and Hagersten (CLH) locks。
CLH队列锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。
当一个线程需要获取锁时:
-
创建一个的QNode,将其中的locked设置为true表示需要获取锁,myPred表示对其前驱结点的引用
-
线程A对tail域调用getAndSet方法,使自己成为队列的尾部,同时获取一个指向其前驱结点的引用myPred
线程B需要获得锁,同样的流程再来一遍
-
线程就在前驱结点的locked字段上旋转,直到前驱结点释放锁(前驱节点的锁值 locked == false)
-
当一个线程需要释放锁时,将当前结点的locked域设置为false,同时回收前驱结点
如上图所示,前驱结点释放锁,线程A的myPred所指向的前驱结点的locked字段变为false,线程A就可以获取到锁。
CLH队列锁的优点是空间复杂度低(如果有n个线程,L个锁,每个线程每次只获取一个锁,那么需要的存储空间是O(L+n),n个线程有n个myNode,L个锁有L个tail)。CLH队列锁常用在SMP体系结构下。
Java中的AQS是CLH队列锁的一种变体实现。
ReentrantLock的实现
锁的可重入
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题。
1)线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
2)锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。
nonfairTryAcquire方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。同步状态表示锁被一个线程重复获取的次数。
如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。
实现我们自己的ReentrantLock
public void test() {
reenter(3);//需要递归调用3次,每一次都需要获取和释放锁。如果按之前的简单逻辑,只判断了当前锁是否被占用,很有可能当前被自己占用,而无法获得锁,从而进入一个死锁的状态。
}
public void reenter(int x) {
lock.lock();
try {
int y = x - 1;
if (y == 0) {
return;
} else {
reenter(y);
}
} finally {
lock.unlock();
}
}
实现我们自己独占锁,可重入
public class ReenterSelfLock implements Lock {
/* 静态内部类,自定义同步器*/
private static class Sync extends AbstractQueuedSynchronizer {
/* 是否处于占用状态*/
@Override
protected boolean isHeldExclusively() {
return getState() > 0;
}
/* 当状态为0的时候获取锁*/
@Override
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
} else if (getExclusiveOwnerThread() == Thread.currentThread()) {
//这里增加一个判断,如果当前的独占线程是自己,那就state增加1,返回return。每次获取锁state增加1,释放锁减少1
setState(getState() + 1);
return true;
}
return false;
}
/* 释放锁,将状态设置为0*/
@Override
protected boolean tryRelease(int releases) {
if (getExclusiveOwnerThread() != Thread.currentThread()) {
throw new IllegalMonitorStateException();
}
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setState(getState() - 1);
if (getState() == 0) {
setExclusiveOwnerThread(null);
}
return true;
}
/* 返回一个Condition,每个condition都包含了一个condition队列*/
Condition newCondition() {
return new ConditionObject();
}
}
/* 仅需要将操作代理到Sync上即可*/
private final Sync sync = new Sync();
@Override
public void lock() {
System.out.println(Thread.currentThread().getName() + " ready get lock");
sync.acquire(1);
System.out.println(Thread.currentThread().getName() + " already got lock");
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName() + " ready release lock");
sync.release(1);
System.out.println(Thread.currentThread().getName() + " already released lock");
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
公平和非公平锁
/**
* 默认的无参构造函数将会把Sync对象创建为NonfairSync对象,这是一个“非公平锁”
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* 传入参数为true时,将会把Sync对象创建为“公平锁”FairSync。
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
nonfairTryAcquire(int acquires)方法,对于非公平锁,只要CAS设置同步状态成功,则表示当前线程获取了锁,而公平锁则不同。tryAcquire方法,该方法与nonfairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。
网友评论