AbstractQueuedSynchronizer
队列同步器,简称AQS,是一个用来构建锁或者其他同步组件的基础框架,他使用了一个int型的成员变量来表示同步状态,内部通过FIFO队列来完成资源获取线程的排队工作
使用AQS实现的锁加锁和释放的过程(为了简单,以不可重入锁为例说明)
lock()方法调用时,尝试去获取锁,如果当前锁没有被其他线程持有,则获取成功,将state+1;如果当前锁已经被其他线程持有,则将当前线程加入CLH队列,此时还存在两种情况,1.队列为空,创建一个节点作为头,并将新加入的节点加入到队列作为尾节点,2.队列非空,直接将新加入的节点作为尾节点(设置为尾节点过程中涉及到了CAS的操作,即如果有其他线程也同时在做相同的操作,通过cas方式设置直到尾节点设置成功)。加入队列后判断一下自己的前置节点是不是head,如果是再次尝试获取锁,获取成功则将此节点设置为新的head,获取失败则挂起线程,等待前置节点释放锁
unlock()方法调用时,将持有锁的线程置空,并将state设置为0,找到它的next并唤醒它
AQS使用方式和其中的设计模式
AQS的主要使用方式是继承,子类通过继承AQS并实现它的抽象方法来管理同步状态,在AQS里由一个int型的state来代表这个状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。
private volatile int state;
在实现上,子类推荐被定义为自定义同步组件的静态内部类,AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock和CountDownLatch等)。
同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器。可以这样理解二者之间的关系:
锁是面向使用者的,它定义了使用者与锁交互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;
同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
实现者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。
AQS基于CLH队列锁实现
CLH队列锁即Craig, Landin, and Hagersten (CLH) locks。
CLH队列锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋。
当一个线程需要获取锁时:
1.创建一个新的QNode,将其中的locked设置为true表示需要获取锁,myPred表示对其前驱结点的引用

2.线程A对tail域调用getAndSet方法,使自己成为队列的尾部,同时获取一个指向其前驱结点的引用myPred

3.线程B需要获得锁,同样的流程再来一遍

4.线程就在前驱结点的locked字段上旋转,直到前驱结点释放锁(前驱节点的锁值 locked == false)
5.当一个线程需要释放锁时,将当前结点的locked域设置为false,同时回收前驱结点

如上图所示,前驱结点释放锁,线程A的myPred所指向的前驱结点的locked字段变为false,线程A就可以获取到锁。
CLH队列锁的优点是空间复杂度低(如果有n个线程,L个锁,每个线程每次只获取一个锁,那么需要的存储空间是O(L+n),n个线程有n个myNode,L个锁有L个tail)。CLH队列锁常用在SMP体系结构下。
Java中的AQS是CLH队列锁的一种变体实现。
AQS中的方法
acquire():独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则将会进入同步队列等待,该方法将会调用重写的tryAcquire(int arg)方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquireInterruptibly:与acquire相同,但是该方法会响应中断,当线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptException并返回
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
tryAcquireNanos:在acquireInterruptibly基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步锁状态,那么就会返回false,如果获取到了就返回true
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
acquireShared:共享式的获取同步状态,如果当前线程为获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有过个线程获取到同步状态
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
acquireSharedInterruptibly:与acquireShared相同,该方法会响应中断
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireSharedNanos:在acquireSharedInterruptibly基础上增加了超时限制
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
release:独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
releaseShared:共享式的释放同步状态
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
getQueuedThreads:获取等待在同步队列上的线程集合
public final Collection<Thread> getQueuedThreads() {
ArrayList<Thread> list = new ArrayList<Thread>();
for (Node p = tail; p != null; p = p.prev) {
Thread t = p.thread;
if (t != null)
list.add(t);
}
return list;
}
其中可以被重写的方法:
tryAcquire
tryRelease
tryAcquireShared
tryReleaseShared
isHeldExclusively
既然AQS是实现同步器的基础框架,那么我们怎么用他来实现一个我们自己的类似ReentrantLock的工具呢?
如下,定义一个类实现Lock接口(每个锁都要实现的接口),然后定义一个内部类,我们这里命名为MySync继承自AbstractQueuedSynchronizer,然后我们自定义的锁的所有操作都是交给这个内部类MySync实现的
public class MyLock implements Lock {
MySync sync = new MySync();
@Override
public void lock() {
System.out.println(Thread.currentThread().getName()+" ready get lock");
sync.acquire(1);
System.out.println(Thread.currentThread().getName()+" already got lock");
}
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
System.out.println(Thread.currentThread().getName()+" ready release lock");
sync.release(1);
System.out.println(Thread.currentThread().getName()+" already released lock");
}
@Override
public Condition newCondition() {
return sync.newCondition();
}
static class MySync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0,1)){
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() == 0){
throw new IllegalMonitorStateException("already release lock");
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
protected Condition newCondition(){
return new ConditionObject();
}
}
}
测试类
public class TestMyLock {
public void test() {
final Lock lock = new MyLock();
for (int i = 0; i < 4; i++) {
Worker w = new Worker(lock);
w.start();
}
}
public static void main(String[] args) {
TestMyLock lock = new TestMyLock();
lock.test();
}
class Worker extends Thread {
private final Lock lock;
public Worker(Lock lock) {
this.lock = lock;
}
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName());
try {
SleepTools.second(3);
} finally {
lock.unlock();
}
}
}
}
打印:
Thread-0 ready get lock
Thread-0 already got lock
Thread-0
Thread-1 ready get lock
Thread-2 ready get lock
Thread-3 ready get lock
Thread-0 ready release lock
Thread-0 already released lock
Thread-1 already got lock
Thread-1
Thread-1 ready release lock
Thread-1 already released lock
Thread-2 already got lock
Thread-2
Thread-2 ready release lock
Thread-2 already released lock
Thread-3 already got lock
Thread-3
Thread-3 ready release lock
Thread-3 already released lock
Process finished with exit code 0
我们这个锁有个缺陷,他目前是不可重入的,接下来我们实现一个自己的可重入锁。重入锁和非重入锁的差别其实并不大,关键就在于获取锁和释放锁的时候是否有判断当前线程是否是已经持有锁的线程这一步
public class MyReentrantLock implements Lock {
MyReentrantSync reentrantSync = new MyReentrantSync();
@Override
public void lock() {
reentrantSync.acquire(1);
}
@Override
public void lockInterruptibly() throws InterruptedException {
reentrantSync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() {
return reentrantSync.tryAcquire(1);
}
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
return reentrantSync.tryAcquireNanos(1,unit.toNanos(time));
}
@Override
public void unlock() {
reentrantSync.release(1);
}
@Override
public Condition newCondition() {
return reentrantSync.newCondition();
}
class MyReentrantSync extends AbstractQueuedSynchronizer{
@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
if (Thread.currentThread() == getExclusiveOwnerThread()){
setState(getState()+1);
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (Thread.currentThread() != getExclusiveOwnerThread()){
throw new IllegalMonitorStateException();
}
if (getState() == 0){
throw new IllegalMonitorStateException("already release");
}
setState(getState()-1);
if (getState() == 0){
setExclusiveOwnerThread(null);
}
return true;
}
protected Condition newCondition(){
return new ConditionObject();
}
}
}
测试类
public class TestReenterSelfLock {
static final Lock lock = new MyReentrantLock();
public void reenter(int x){
lock.lock();
try {
System.out.println(Thread.currentThread().getName()+":递归层级:"+x);
int y = x - 1;
if (y==0) return;
else{
reenter(y);
}
} finally {
lock.unlock();
}
}
public void test() {
for (int i = 0; i < 3; i++) {
Worker w = new Worker();
w.start();
}
}
public static void main(String[] args) {
TestReenterSelfLock testMyLock = new TestReenterSelfLock();
testMyLock.test();
}
class Worker extends Thread {
public void run() {
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
reenter(3);
}
}
}
打印结果
Thread-0
Thread-1
Thread-2
Thread-0:递归层级:3
Thread-0:递归层级:2
Thread-0:递归层级:1
Thread-2:递归层级:3
Thread-2:递归层级:2
Thread-2:递归层级:1
Thread-1:递归层级:3
Thread-1:递归层级:2
Thread-1:递归层级:1
网友评论