什么是AQS?
AQS是AbstractQueuedSynchronizer(队列同步器)的简写,是用来构建锁和其他同步组件的基本框架。内部使用了一个 int 成员变量 state 来表示同步状态。通过内置的 FIFO(先进先出)队列来完成资源获取线程的排队工作。
AQS的使用和其中的设计模式
package java.util.concurrent.locks;
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
protected final boolean compareAndSetState(int expect, int update) {
return U.compareAndSwapInt(this, STATE, expect, update);
}
......
}
可以看到AQS是一个抽象类,需要继承并且实现其抽象方法来管理同步状态。在AQS里由一个int型的state来代表这个状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用AQS同步器提供的3个方法getState()、setState(int newState)和compareAndSetState(int expect,int update)来进行操作,因为它们能够保证状态的改变是安全的。
在实现上,子类推荐被定义为自定义同步组件的静态内部类,AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。
同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器。可以这样理解二者之间的关系:
- 锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;
- 同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
- 实现者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
AQS中应用了设计模式中的模版方法模式。下面通过一个简单的例子了解一下:
定义炒菜的模版方法:洗,切,炒
public abstract class Cooks {
public void makeCook(){
wash();
cut();
cook();
}
public abstract void wash();
public abstract void cut();
public abstract void cook();
}
public class Vegetables extends Cooks{
@Override
public void wash() {
System.out.println("洗菜");
}
@Override
public void cut() {
System.out.println("切菜");
}
@Override
public void cook() {
System.out.println("炒菜");
}
}
public class Meats extends Cooks {
@Override
public void wash() {
System.out.println("洗肉");
}
@Override
public void cut() {
System.out.println("切肉");
}
@Override
public void cook() {
System.out.println("炒肉");
}
}
public static void main(String[] args) {
Cooks meats = new Meats();
Cooks vegetables = new Vegetables();
// 炒了一份肉
meats.makeCook();
}
以上就是模版方法设计模式的简单实用。父类规定好执行的顺序,子类根据自己的需求去实现自己的逻辑完成不同的功能。
下面我们自定义一个锁来看一下如何使用AQS的模版方法如何使用:
// 实现Lock接口
public class MyLock implements Lock {
private Sync sync;
public MyLock() {
this.sync = new Sync();
}
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
}
@Override
public boolean tryLock() {
return false;
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public void unlock() {
sync.release(1);
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
public class Sync extends AbstractQueuedSynchronizer {
/**
* 是否持有锁
*/
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
/**
* 获得锁
*/
@Override
protected boolean tryAcquire(int arg) {
// 通过CAS机制来保证原子性
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/**
* 释放锁
*/
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0) {
throw new IllegalMonitorStateException(Thread.currentThread().getName() + "---没有锁可以释放");
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
Condition newCondition() {
return new ConditionObject();
}
}
}
在测试我们自己实现的锁之前,先来展示一下不用锁的时候出现的并发问题:
public static class UseMyLock{
private int count = 0;
public void add(){
count = count + 1;
}
public int getCount() {
return count;
}
}
public static class ThreadImpl extends Thread {
private UseMyLock myCount;
public ThreadImpl(UseMyLock myCount) {
this.myCount = myCount;
}
@Override
public void run() {
super.run();
for (int i = 0; i < 10000; i++) {
myCount.add();
}
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
UseMyLock myCount = new UseMyLock();
ThreadImpl t1 = new ThreadImpl(myCount);
ThreadImpl t2 = new ThreadImpl(myCount);
t1.start();
t2.start();
Thread.sleep(1000);
System.out.println("----->" + myCount.getCount());
}
其中一次运行结果
可以看到不加锁的时候,绝大多数时候的值都不是代码所期望的20000,而且每次运行的结果都不一致。
下面使用我们自己的UseMyLock来验证一下是否我们自己的锁能否生效:
private Lock lock = new MyLock();
public void add(){
lock.lock();
try {
count = count + 1;
}finally {
lock.unlock();
}
}
使用UseMyLock
可以看到,两个线程同时操作count变量,最终的产生了正确的结果,不管运行多少次结果都是20000。
CLH队列锁
我们知道在多线程的情况下当一个线程获得了锁,也就是获得了执行权。那么剩下的线程则需要在锁的外面等待。那么CLH队列锁就是这种基本思想。CLH队列则会把凡是需要在锁外面排队的线程打包成一个QNode,下面是伪代码:
public class QNode {
// 当前线程
Thread currentThread;
// 上一个节点
QNode mPrev;
// 是否持有锁
boolean locked;
}
假设现在有一个线程0获得了锁正在执行代码,那么他的locked则会被置为true,那么QNode-1,也就是线程1通过在mPrev上进行类似于CAS的自旋操作一直来检查线程0的locked标记。假如当某一刻线程0的locked变成了false,那么线程1就认为线程0执行完了并释放了锁,那么就轮到线程1去拿锁了。
CLH队列锁思想的应用
- AQS就是基于CLH队列锁的思想实现的,AQS可以成为CLH队列锁的一个变体的实现。例如在AQS中不是单向链表而是一个双向链表。并且也不会一直去进行自旋。因为当等待的节点较多的时候,会把所有的CPU都占用而无法做其他工作。所以在AQS中一般会自旋两到三次之后就把当前线程挂起。
AQS中的CLH队列可以有多个。除了上面讲CLH思想中的等待拿锁的队列之外,还可以有一个甚至多个等待唤醒队列,也就是被阻塞的线程会被放入这个队列。当被唤醒的时候则会被移到等待拿锁的队列。 - synchronized的实现也是基于CLH队列锁思想。与AQS不同的是synchronized只有一个等待唤醒队列
公平锁和非公平锁概念
按照上面介绍的CLH队列锁的思想,我们可以知道CLH队列锁的思想就是一种公平锁的思想。所有的线程按照先后顺序,排在队列前面的就先拿到锁,后来的则需要排在末尾等待。也就是我们常说的先到先得。而非公平锁则是不在队列后面排队等待,只要有一个线程释放锁了,就有机会去抢一下锁。有点类似于插队。关于公平锁和非公平锁的区别,我们可以直接在ReentrantLock中直观的看到区别:
公平锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
非公平锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
可以看到两段代码几乎一模一样,唯一的区别就是公平锁在实现的时候会判断hasQueuedPredecessors()。这个方法则是判断队列中是否有线程在排队。
网友评论