美文网首页
多线程(11) — AQS抽象队列同步

多线程(11) — AQS抽象队列同步

作者: 烧杰 | 来源:发表于2018-04-17 22:25 被阅读0次

    AQS是指AbstractQueuedSynchronizer,抽象队列同步。AQS是多个重要接口实现的工具类包括之前讲的ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch、Condition接口由ConditionObject实现,而ConditionObject又是AQS的内部类,所以AQS是实现其功能的重要工具类。下面是AQS的大致类图:

    image.png
    1. AQS是一个通过内置的FIFO双向队列来完成线程的排队工作(内部通过结点head和tail记录队首和队尾元素,元素的结点类型为Node类型,后面我们会看到Node的具体构造)。
    image.png
    1. Node中的thread用来存放进入AQS队列中的线程引用,Node结点内部的SHARED表示标记线程是因为获取共享资源失败被阻塞添加到队列中的;Node中的EXCLUSIVE表示线程因为获取独占资源失败被阻塞添加到队列中的。waitStatus表示当前线程的等待状态:

    ①CANCELLED=1:表示线程因为中断或者等待超时,需要从等待队列中取消等待;

    ②SIGNAL=-1:当前线程thread1占有锁,队列中的head(仅仅代表头结点,里面没有存放线程引用)的后继结点node1处于等待状态,如果已占有锁的线程thread1释放锁或被CANCEL之后就会通知这个结点node1去获取锁执行。

    ​③CONDITION=-2:表示结点在等待队列中(这里指的是等待在某个lock的condition上,关于Condition的原理下面会写到),当持有锁的线程调用了Condition的signal()方法之后,结点会从该condition的等待队列转移到该lock的同步队列上,去竞争lock。(注意:这里的同步队列就是我们说的AQS维护的FIFO队列,等待队列则是每个condition关联的队列)

    ​④PROPAGTE=-3:表示下一次共享状态获取将会传递给后继结点获取这个共享同步状态。

    同步状态值(state)干什么用的?
    ①首先在多线程竞争的条件下,采用CAS的方式来获取和设置同步状态值(state)。
    ②同步状态值state代表获取锁的线程加锁的次数,如果线程获取锁,那么state加1变为1。如果线程释放锁,那么state减1变为0。
    ③volatile实现原则还是将缓存中的数据写入到主存,每个线程都是从主存中读取值。保证了数据的一致性。

    1. AQS中维持了一个单一的volatile修饰的状态信息state(AQS通过Unsafe的相关方法,以原子性的方式由线程去获取这个state)。AQS提供了getState()、setState()、compareAndSetState()函数修改值(实际上调用的是unsafe的compareAndSwapInt方法)。下面是AQS中的部分成员变量以及更新state的方法
    //这就是我们刚刚说到的head结点,懒加载的(只有竞争失败需要构建同步队列的时候,才会创建这个head),如果头节点存在,它的waitStatus不能为CANCELLED
    private transient volatile Node head;
    //当前同步队列尾节点的引用,也是懒加载的,只有调用enq方法的时候会添加一个新的wait node
    private transient volatile Node tail;
    //AQS核心:同步状态
    private volatile int state;
    protected final int getState() {
        return state;
    }
    protected final void setState(int newState) {
        state = newState;
    }
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    

    这个方法采用cas原则来保证状态设置的原子性,cas本身采用乐观锁的方式来实现的,从而不会产生线程的阻塞问题。由于采用volatile读写语义,那么线程访问是保持一致性的。
    理由:因为volatile实现原则是将缓存中的数据写入到主存中的。所以每个线程读写的数据都是从主存中获取来的,而不是每个线程缓存的数据,所以保证了一致性。

    1. AQS的设计模式是基于模板方法模式的。使用时候需要继承同步器并重写指定的方法,并且通常将子类推荐为定义同步组件的静态内部类,子类重写这些方法之后,AQS工作时使用的是提供的模板方法,在这些模板方法中调用子类重写的方法。其中子类可以重写的方法
    //独占式的获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
    protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException();}
    //独占式的释放同步状态,等待获取同步状态的线程可以有机会获取同步状态
    protected boolean tryRelease(int arg) { throw new UnsupportedOperationException();}
    //共享式的获取同步状态
    protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException();}
    //尝试将状态设置为以共享模式释放同步状态。 该方法总是由执行释放的线程调用。 
    protected int tryReleaseShared(int arg) { throw new UnsupportedOperationException(); }
    //当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占
    protected int isHeldExclusively(int arg) {  throw new UnsupportedOperationException();}
    
    1. AQS的内部类ConditionObject是通过结合锁实现线程同步,ConditionObject可以直接访问AQS的变量(state、queue),ConditionObject是个条件变量 ,每个ConditionObject对应一个队列用来存放线程调用condition条件变量的await方法之后被阻塞的线程。

    AQS模式:
    AQS分为独占模式和共享模式,独占模式下其他线程无法获取该线程锁。在共享模式下AQS维护了一个等待资源的FIFO先进先出队列,在线程1资源被释放后,AQS会从头结点开始依次唤醒队列中的线程2等线程2结束后再唤醒后面的从前至后的所有结点,使他们对应的线程恢复执行,直到队列为空。

    AQS中的独占模式

    了解了一下AQS的基本组成,这里通过ReentrantLock的非公平锁实现来具体分析AQS的独占模式的加锁和释放锁的过程。

    非公平锁的加锁流程
    简单说来,AQS会把所有的请求线程构成一个CLH队列,当一个线程执行完毕(lock.unlock())时会激活自己的后继节点,但正在执行的线程并不在队列中,而那些等待执行的线程全部处于阻塞状态(park()),如下图所示。

    image.png

    ​ 1. 假设这个时候在初始情况下,还没有多任务来请求竞争这个state,这时候如果第一个线程thread1调用了lock方法请求获得锁,首先会通过CAS的方式将state更新为1,表示自己thread1获得了锁,并将独占锁的线程持有者设置为thread1。

    final void lock() {
        if (compareAndSetState(0, 1))
            //setExclusiveOwnerThread是AbstractOwnableSynchronizer的方法,AQS继承了AbstractOwnableSynchronizer
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
    
    1. 这个时候有另一个线程thread2来尝试或者锁,同样也调用lock方法,尝试通过CAS的方式将state更新为1,但是由于之前已经有线程持有了state,所以thread2这一步CAS失败(前面的thread1已经获取state并且没有释放),就会调用acquire(1)方法(该方法是AQS提供的模板方法,它会调用子类的tryAcquire方法)。非公平锁的实现中,AQS的模板方法acquire(1)就会调用NofairSync的tryAcquire方法,而tryAcquire方法又调用的Sync的nonfairTryAcquire方法,所以我们看看nonfairTryAcquire的流程。
    //NofairSync
    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
    final boolean nonfairTryAcquire(int acquires) {
        //(1)获取当前线程
        final Thread current = Thread.currentThread();
        //(2)获得当前同步状态state
        int c = getState();
        //(3)如果state==0,表示没有线程获取
        if (c == 0) {
            //(3-1)那么就尝试以CAS的方式更新state的值
            if (compareAndSetState(0, acquires)) {
                //(3-2)如果更新成功,就设置当前独占模式下同步状态的持有者为当前线程
                setExclusiveOwnerThread(current);
                //(3-3)获得成功之后,返回true
                return true;
            }
        }
        //(4)这里是重入锁的逻辑
        else if (current == getExclusiveOwnerThread()) {
            //(4-1)判断当前占有state的线程就是当前来再次获取state的线程之后,就计算重入后的state
            int nextc = c + acquires;
            //(4-2)这里是风险处理
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            //(4-3)通过setState无条件的设置state的值,(因为这里也只有一个线程操作state的值,即
            //已经获取到的线程,所以没有进行CAS操作)
            setState(nextc);
            return true;
        }
        //(5)没有获得state,也不是重入,就返回false
        return false;
    }
    

    总结来说就是:

    1、获取当前将要去获取锁的线程thread2。

    2、获取当前AQS的state的值。如果此时state的值是0,那么我们就通过CAS操作获取锁,然后设置AQS的线程占有者为thread2。很明显,在当前的这个执行情况下,state的值是1不是0,因为我们的thread1还没有释放锁。所以CAS失败,后面第3步的重入逻辑也不会进行

    3、如果当前将要去获取锁的线程等于此时AQS的exclusiveOwnerThread的线程,则此时将state的值加1,这是重入锁的实现方式。

    4、最终thread2执行到这里会返回false。

    ​ (3)上面的thread2加锁失败,返回false。那么根据开始我们讲到的AQS概述就应该将thread2构造为一个Node结点加入同步队列中。因为NofairSync的tryAcquire方法是由AQS的模板方法acquire()来调用的,那么我们看看该方法的源码以及执行流程。

    //(1)tryAcquire,这里thread2执行返回了false,那么就会执行addWaiter将当前线程构造为一个结点加入同步队列中
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    AQS共享模式

    AQS框架数据结构是一个先进先出的双向队列,当多个线程进行竞争资源时,那些竞争失败的线程会加入到队列中。他向上层提供了很多接口,其中一个是acquireShared获取共享模式的接口。本文将会根据这个接口一步步分析,获取资源失败的线程是怎么进入到队列中的,进入到队列中又是怎么出队列再次竞争资源的,下面是acquireShared执行的一个大致流程:

    1. 多个线程通过调用tryAcquireShared方法获取共享资源,返回值大于等于0则获取资源成功,返回值小于0则获取失败。

    2. 当前线程获取共享资源失败后,通过调用addWaiter方法把该线程封装为Node节点,并设置该节点为共享模式。然后把该节点添加到队列的尾部。

    3. 添加到尾部后,判断该节点的上一个节点是不是队列的头节点,如果是头节点,那么该节点的上一个节点出队列并获取共享资源,同时调用setHeadAndPropagate方法把该节点设置为新的头节点,同时唤醒队列中所有共享类型的节点,去获取共享资源。如果获取失败,则再次加入到队列中。

    4. 如果该节点的前驱节点不是头节点,那么通过for循环进行自旋转等待,直到当前节点的前驱节点是头节点,结束自旋。

    这就是AQS共享模式竞争资源失败的大致流程。

    AQS总结:

    1.状态变量state,AQS中定义了一个状态变量state,它有以下两种使用方法:

    (1)互斥锁
    当AQS只实现为互斥锁的时候,每次只要原子更新state的值从0变为1成功了就获取了锁,可重入是通过不断把state原子更新加1实现的。

    (2)互斥锁 + 共享锁
    当AQS需要同时实现为互斥锁+共享锁的时候,低16位存储互斥锁的状态,高16位存储共享锁的状态,主要用于实现读写锁。互斥锁是一种独占锁,每次只允许一个线程独占,且当一个线程独占时,其它线程将无法再获取互斥锁及共享锁,但是它自己可以获取共享锁。共享锁同时允许多个线程占有,只要有一个线程占有了共享锁,所有线程(包括自己)都将无法再获取互斥锁,但是可以获取共享锁。

    2.AQS队列
    AQS中维护了一个队列,获取锁失败(非tryLock())的线程都将进入这个队列中排队,等待锁释放后唤醒下一个排队的线程(互斥锁模式下)。

    3.Condition队列
    AQS中还有另一个非常重要的内部类ConditionObject,它实现了Condition接口,主要用于实现条件锁。ConditionObject中也维护了一个队列,这个队列主要用于等待条件的成立,当条件成立时,其它线程将signal这个队列中的元素,将其移动到AQS的队列中,等待占有锁的线程释放锁后被唤醒。Condition典型的运用场景是在BlockingQueue中的实现,当队列为空时,获取元素的线程阻塞在notEmpty条件上,一旦队列中添加了一个元素,将通知notEmpty条件,将其队列中的元素移动到AQS队列中等待被唤醒。

    4.设计模式之—模版方法
    AQS抽象类巧妙地用了模版方法面定义了一系列的模板方法,比如下面这些:

    // 获取互斥锁
    public final void acquire(int arg) {
       // tryAcquire(arg)需要子类实现
       if (!tryAcquire(arg) &&
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
    }
    // 获取互斥锁可中断
    public final void acquireInterruptibly(int arg)
           throws InterruptedException {
       if (Thread.interrupted())
           throw new InterruptedException();
       // tryAcquire(arg)需要子类实现
       if (!tryAcquire(arg))
           doAcquireInterruptibly(arg);
    }    
    // 获取共享锁
    public final void acquireShared(int arg) {
       // tryAcquireShared(arg)需要子类实现
       if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
    }
    // 获取共享锁可中断
    public final void acquireSharedInterruptibly(int arg)
           throws InterruptedException {
       if (Thread.interrupted())
           throw new InterruptedException();
       // tryAcquireShared(arg)需要子类实现
       if (tryAcquireShared(arg) < 0)
           doAcquireSharedInterruptibly(arg);
    }
    // 释放互斥锁
    public final boolean release(int arg) {
       // tryRelease(arg)需要子类实现
       if (tryRelease(arg)) {
           Node h = head;
           if (h != null && h.waitStatus != 0)
               unparkSuccessor(h);
           return true;
       }
       return false;
    }
    // 释放共享锁
    public final boolean releaseShared(int arg) {
       // tryReleaseShared(arg)需要子类实现
       if (tryReleaseShared(arg)) {
           doReleaseShared();
           return true;
       }
       return false;
    }
    

    获取锁、释放锁的这些方法基本上都穿插在ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch的源码解析中了,这些工具类后面会降讲到。

    需要子类实现的方法
    上面一起学习了AQS中几个重要的模板方法,下面我们再一起学习下几个需要子类实现的方法:

    // 互斥模式下使用:尝试获取锁
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    // 互斥模式下使用:尝试释放锁
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    // 共享模式下使用:尝试获取锁
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 共享模式下使用:尝试释放锁
    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }
    // 如果当前线程独占着锁,返回true
    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }
    

    这几个方法为什么不直接定义成抽象方法呢?

    因为子类只要实现这几个方法中的一部分就可以实现一个同步器了,所以不需要定义成抽象方法。

    总结
    今天我们大概讲了下AQS中几个重要的组成部分:

    (1)状态变量state;
    (2)AQS队列;
    (3)Condition队列;
    (4)模板方法;
    (5)需要子类实现的方法;

    本文参考:
    https://www.cnblogs.com/tong-yuan/p/abstractqueuedsynchronizer.html
    AQS源码详细分析资料:
    https://www.cnblogs.com/fsmly/p/11274572.html
    https://blog.csdn.net/ya_1249463314/article/details/77838509

    相关文章

      网友评论

          本文标题:多线程(11) — AQS抽象队列同步

          本文链接:https://www.haomeiwen.com/subject/rzbicftx.html