AQS即AbstractQueuedSynchronizer
,想要讲清楚AQS的源码实现是相当有难度的一件事情,它的结构设计较为庞杂,本文要讲的事情是关于它的部分要点以及整体框架把握。
我们都是知道,在Java1.5之前,多线程的协同工作由两种机制来完成,一种是锁Synchronized
,另一种是Object
类中的wait()
和notify()
实现多线程之前的协同。自从1.5之后,在Java的并发包中出现了ReentrantLock
,CountDownLatch
,Semaphore
等实现的同步方法。它们有效而灵活的特性取缔了原始的加锁协同机制。延长了程序员的生命,让Java世界清爽了许多。但其背后都有一个大功臣就是AQS
。它可以很广泛有效的构建出各式定制化的Synchronizer
,它的优势在于,如果使用AQS框架提供的一些方法实现进行同步器构建,你的关注点仅仅在于一个并发点上。
解析AQS架构
AQS的架构可以认为它实现了四个场景:
- 加锁
- 释放锁
- 等待
- 唤醒
本文在之后的讲解流程里将把握这几点,来讲述它的实现结构。
基本参数
AQS的架构中有一个状态叫做state
,它的含义是当有线程持有锁的时候,它的值会加1。对于重入锁而言,每加锁一次,state的值都会加1,每次释放锁,它的值就会减一。
/**
* The synchronization state.
*/
private volatile int state;
另外在AQS中实现了两个队列,叫做同步队列
和条件队列
,它们都是由双向链表构建。通过这两个队列,实现了上述提到的四个功能。
加锁与释放锁
共享锁和排它锁的区别
在AQS的架构中,加锁有两种形式,即共享锁
和排它锁
。
共享锁,也可以被叫做读锁,它可以被多个线程共同持有。排它锁也可以被叫做写锁,它只能被一个线程所持有。
加锁的实现方式
对于排它锁而言,当有线程来尝试获取锁的时候,首先会调用tryAcquire
方法。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
可以看到这个方法要求是在子类中实现的,假设我们实现了这个方法。在获知可以获得锁的时候,会调用compareAndSetState
方法把state
置为1,否则当锁还没被释放,则去执行acquire()
方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个方法会把当前线程装在Node
节点里面放进同步队列中,等待锁释放,并阻塞。当之前的线程释放了锁,便会重新从同步队列的节点头拿到这个这个线程请求,把锁get分配给它。
对于共享锁而言,它调用的方法是tryAcquireShared()
,和acquireShared()
,基本流程和排它锁一致。不同的是它放进同步队列中阻塞,而是会唤醒队列后面的节点,一起去获取锁。
基于此,如果去阅读ReentrantLock
的源码,会觉得结构相当清晰,和自己写的没啥区别。。
释放锁
释放锁的方式与加锁的结构相似,它的目的就是唤醒同步队列中阻塞的节点去获取锁。具体细节不多做说明。
等待和唤醒
在上面的介绍中,我们看到了使用同步队列和AQS提供给我们的一些方法来实现了加锁解锁的功能。看起来似乎只使用同步队列就足够了,条件队列是否多余呢?来设想这么一个场景。
static ReentrantLock lock = new ReentrantLock();
static Condition condition = lock.newCondition();
在这里,newCondition()就是依照条件队列为父类实现的。
final ConditionObject newCondition() {
return new ConditionObject();
}
当同步队列满了之后,如果在继续put,则会把Node
放在条件队列里,如果当队列为空,线程调用take方法,这时候也会把此线程的Node
放入条件队列。
或者说,执行await()
方法的时候,线程会释放锁,进入到条件队列中。执行signal()
方法,线程会从条件队列放到同步队列,然后尝试着拿到锁。
总之,等待和唤醒的实现使用条件队列的本质 ,是为了完成同步队列在某些场景下不足的情况。
借用《Java并发编程实践》中的一个例子,实现一个简单的多线程协同子类。
class OneLatch {
Sync sync = new Sync();
public void siginal() {
sync.releaseShared(0);
}
public void await() throws InterruptedException {
sync.acquireInterruptibly(0);
}
}
class Sync extends AbstractQueuedSynchronizer {
@Override
protected boolean tryReleaseShared(int arg) {
setState(1);
return true;
}
@Override
protected int tryAcquireShared(int arg) {
return getState() == 1 ? 1 : -1;
}
}
实现tryReleaseShared
和tryAcquireShared
方法表示线程能继续被执行。使用acquireInterruptibly
方法则表明把当前获取锁失败的线程放入条件队列中。
总结
总之,AQS是一个简化多线程同步的框架,通过它我们不仅更加容易的理解和阅读它的子类源码。而且我们也可以定制化的实现自己需要的同步机制。
网友评论