美文网首页
并发编程之AQS

并发编程之AQS

作者: 林千景 | 来源:发表于2019-02-24 23:57 被阅读0次

    写在前面

    上一节我们讲到了CAS的基本原理,JUC下的atomic类都是通过CAS来实现的。它的核心思想就是比较并替换,其原子性是有操作系统底层来保证。CAS的无锁并发原理很适合在某些场景下替换锁,但是它仍然有很多缺点,比如ABA的问题,比如长时间自旋的性能消耗。今天要讲的是JUC下的核心类AQS,即AbstractQueuedSynchronizer,抽象队列同步器,它是用来构建锁和其他同步组件的基础框架,像ReentrantLock,ReentrantReadWriteLock都是用AQS来构建的。

    AQS的基本结构

    AQS内部用了一个整型变量state来表示同步状态,通过内置的CLH队列来完成资源获取线程的排队工作,CLH队列是一个FIFO的双向队列,当当前线程尝试获取同步状态失败时,AQS将会将当前线程的等待状态,线程等信息构造成一个Node节点加入CLH队列尾部,同时阻塞当前线程。当同步状态释放时,唤醒head节点,让其再次尝试获取同步状态。

    01.png

    同步状态

    同步器的主要使用方式是继承,子类通过继承AQS并实现它的同步方法来管理同步状态,上图中的state即为AQS的同步状态。

    02.png

    并提供了几个操作同步状态的方法:

    protected final int getState() { //获取state的值
            return state;
        }
    protected final void setState(int newState) { //更新state的值
            state = newState;
        }
    protected final boolean compareAndSetState(int expect, int update) //cas更新state的值
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    

    这几个方法都是final修饰的,说明子类不能重写,又是protected修饰的,说明只能在子类中使用。

    由于AQS是基于模板方法设计的,子类需要重写指定的方法,其提供的模板方法分为3类:

    • 独占式获取与释放同步状态
    • 共享式获取与释放同步状态
    • 查询同步队列中的等待线程情况

    同步器可重写的方法如下表所示:

    方法名称 描述
    protected boolean tryAcquire(int arg) 独占式获取同步状态
    protected boolean tryRelease(int arg) 独占式释放同步状态
    protected int tryAcquireShared(int arg) 共享式获取同步状态
    protected boolean tryReleaseShared(int arg) 共享式释放同步状态
    protected boolean isHeldExclusively() 当前同步器是否在独占模式下被占用

    关于独占模式和共享模式,一般来讲,我们可以通过修改state字段的同步状态来实现。

    比如说,在独占模式下,state的初始值是0,每当有线程要进入独占操作的时候,CAS操作下state,将其改为1,当第二个线程也需要进行独占操作的时候,发现state的值已经是1了,那么当前线程会一直阻塞,直到获取同步状态成功的线程释放了同步状态,也就是把state的值改为了0,唤醒等待队列的头结点线程,重新加入竞争。

    在共享模式下,假设允许n个线程并发执行,那么初始化state的值就为n,超过这个值的线程就会等待。一个线程尝试获取同步状态,就需要先判断state的值是不是大于0的,如果不大于0意味着此时n个线程跑满了,需要阻塞等待,如果大于0,那么可以尝试获取同步状态,并将state的值减1。当某个线程同步操作执行结束后需要释放同步状态,也就是将state的值加1。

    说白了,在独占模式下,我们要重写tryAcquire,tryRelease,isHeldExclusively,在共享模式下,需要重写tryAcquireShared,tryReleaseShared,举个独占锁的栗子:

    public class SyncDemo {
        private final Sync sync = new Sync();
    
        public void lock() {
            sync.acquire(1);
        }
    
        public void unlock() {
            sync.release(1);
        }
        
        private static class Sync extends AbstractQueuedSynchronizer {
    
            /**
             * 当state为0的时候尝试获取同步状态
             *
             * @param arg
             * @return
             */
            @Override
            protected boolean tryAcquire(int arg) {
                if (compareAndSetState(0, 1)) { //CAS更新state的值
                    setExclusiveOwnerThread(Thread.currentThread()); //设置占用独占锁的线程是当前线程
                    return true;
                }
                return false;
            }
    
            /**
             * 尝试释放同步状态,将state设置为0
             *
             * @param arg
             * @return
             */
            @Override
            protected boolean tryRelease(int arg) {
                setState(0);
                setExclusiveOwnerThread(null);
                return true;
            }
    
            /**
             * 是否处于独占状态
             *
             * @return
             */
            @Override
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
        }
    }
    

    这样我们把lock()unlock()方法暴露出去,就可以当成一个独占锁来使用了。

    同步队列

    AQS依赖内部的一个FIFOCLH双向队列来完成同步状态器的管理。当前线程获取同步状态失败时,同步器会将当前线程以及等待状态信息等构造成一个Node节点加入CLH队列尾部,同时阻塞当前线程。当同步状态释放时,会把首节点的线程唤醒,使其再次尝试获取同步状态。

    Node节点是AQS的一个静态内部类:

    static final class Node {
            /** Marker to indicate a node is waiting in shared mode */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode */
            static final Node EXCLUSIVE = null;
    
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
        
            volatile int waitStatus;
    
            volatile Node prev;
    
            volatile Node next;
    
            volatile Thread thread;
    
            Node nextWaiter;
        }
    

    同时我们看到AQS有两个成员变量headtail对应了CLH队列的首尾节点:

    03.png

    Node里的waitStatus表示节点状态:

    变量 描述
    CANCELLED 1 等待超时或被中断的线程节点
    SIGNAL -1 后继节点的线程处于等待状态
    CONDITION -2 节点在等待队列中
    PROPAGATE -3 下一次共享式同步状态获取将会无条件地被传播下去
    INITIAL 0 初始状态

    thread表示当前线程。

    prevnext表示同步队列的前驱节点和后继节点。

    nextWaiter表示等待队列中的后继节点。

    Node节点构成了同步队列CLH,获取同步状态失败的节点将被放入CLH的尾部。这一个操作是需要保证线程安全的,因为可能有大量获取同步状态失败的线程同时插入同步队列尾节点。AQS利用CAS的操作提供了一个设置尾节点的方法:compareAndSetTail(Node expect, Node update)。再看CLH的首节点,由于是FIFO的队列,首节点其实对应的就是获取同步状态成功的节点,首节点在获取同步状态成功后会释放掉占用的同步状态,并唤醒后继节点(即CLH的第二个节点),后继节点在自己获取同步状态成功之后也会将自己设置为首节点。

    下面对AQS源码进行解析。

    独占式

    所谓的独占,就是指同一时刻,只有一个线程能获取到同步状态,例如,ReentrantLock就是独占的。

    独占式获取同步状态

    这里我们有一个很重要的议题就是,获取同步状态失败的线程将会被包装成Node节点放入同步队列CLH里面,等待着同步状态释放后被唤醒。

    通过调用AQSacquire(int arg)方法可以获取同步状态,来看代码:

    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    这里首先执行tryAcquire方法,这个方法前面我们讲过,是由同步器的子类去实现的,那么如果返回true则表示获取同步状态成功,下面就不必执行了,返回false的话,我们需要将此节点加入到同步队列的尾部。

    尝试获取同步状态失败的话,接着会执行addWaiter方法:

    private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode); //构造新节点,thread为当前线程
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail; 
            if (pred != null) { //尾节点不为空,将新节点插入尾部
                node.prev = pred; 
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node); //自旋插入新节点
            return node;
        }
    

    addWaiter用于向同步列表尾部插入新节点,如果尾节点不为空,直接CAS插入Node,否则执行enq方法。

    private Node enq(final Node node) {
            for (;;) { //自旋
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node())) //尾节点为空,则新建节点当做头节点
                        tail = head;
                } else { //尾节点不为空则CAS将新节点插入尾部
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    可以看到,初始化开始,同步队列里是没有节点的,经过addWaiter之后,同步队列将插入两个节点,一个不携带任何线程信息head节点(假设叫init节点),以及携带当前线程信息的新节点Node(假设叫节点1)。示意图如下:

    04.png

    其中节点1是我们真正插入的节点,代表的是获取同步状态失败的线程(假设这个线程叫线程1),而init节点是初始化出来的节点,它并没有线程信息,waitStatus都为0 ,表示节点都是刚初始化的。

    最后,addWaiter方法返回了当前节点node,对应于图中的节点1。

    addWaiter方法走完,接着走acquireQueued方法:

    final boolean acquireQueued(final Node node, int arg) { //此时的node为addWaiter返回的节点
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) { //自旋
                    final Node p = node.predecessor(); //前驱节点
                    if (p == head && tryAcquire(arg)) { //前驱节点为head节点则再次尝试获取同步状态
                        setHead(node); //同步状态获取成功则将头节点设为当前节点
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    acquireQueued方法里我们先看自旋里的第一个if语句,它的逻辑是:如果在经过addWaiter方法我们将当前节点加入到同步队列的尾部之后,判断当前节点的前驱节点,如果前驱节点是head节点,则再次尝试获取同步状态。这样做是因为,如果获取同步状态的线程在与当前线程竞争的时候虽然获取了同步状态,于是当前线程乖乖地通过addWaiter将自己加入到同步队列的尾部,但是如果获取同步状态的线程很快又把同步状态释放了,那么当前线程就应该再次尝试能不能获取到同步状态,如果能,则把head节点设置为自己,同时把thread变量设为null,意味着自己成为了init节点。(为什么要判断前驱节点是head?因为在同步队列CLH中只有head节点的后继节点才有资格去获取同步状态。)

    如果前驱节点不是head节点,我们看自旋方法里的第二个if语句,首先去执行shouldParkAfterFailedAcquire(p, node)方法,它传入的第一个参数是前驱节点,第二个参数是当前节点。来看源码:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                return true;
            if (ws > 0) {
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    shouldParkAfterFailedAcquire主要是判断waitStatus即节点线程的状态,我们一步一步来。

    • 如果前驱节点的waitStatus等于SIGNAL,即等于-1,则返回true。意味着前驱节点的后继节点,即当前节点可以被阻塞。
    • 如果前驱节点的waitStatus大于0,则表示前驱节点是等待超时或被中断的节点,需要把之前所有waitStatus大于0的节点都移除掉。
    • 如果前驱节点的waitStatus不满足于上两种情况,则CAS更新waitStatus的值为SIGNAL,即-1。此时表示当前节点是可以被阻塞的。那么在acquireQueued自旋进入下一次的时候,就会走到第一个if语句了,最终返回true

    回到刚刚那张图,初始化的时候init节点和节点1的waitStatus都为0 ,在经过shouldParkAfterFailedAcquire方法之后,init节点waitStatus变成了SIGNAL,-1,如图:

    05.png

    此时我们再看parkAndCheckInterrupt方法:

    private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    这个方法很简单,就是立即阻塞当前线程。

    至此,在独占模式下一个获取同步状态失败的线程插入同步队列尾部并阻塞当前线程的流程就跑完了。

    此时,如果又来了一个线程(假设是线程2)进行同步状态的获取,假如此时获取到同步状态的线程还没有释放,则最终结果会是这样:

    06.png

    独占式释放同步状态

    我们上面讲到,独占模式下,多个线程同时竞争,只有一个线程能获取到同步状态,剩余的线程将会组装成CLH同步队列进行阻塞,等待被唤醒。当获取到同步状态的线程释放掉同步状态时,我们才会去唤醒下一个可以被唤醒的线程(即state从1再次变成0时),释放同步状态的方法是release:

    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    可以看到,独占式地释放同步状态,要求子类里重写的tryRelease方法返回了true(即state从1变成了0),并且head节点不为nullwaitStatus不为0。那么久去执行真正的唤醒线程的方法unparkSuccessor:

    private void unparkSuccessor(Node node) {
            int ws = node.waitStatus; //head节点的状态
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0); //cas更新waitStatus为0
        
            Node s = node.next; //有资格被唤醒的下一个节点
            if (s == null || s.waitStatus > 0) { //当前节点是尾节点或后继节点被取消/中断了
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev) //tail回溯寻找第一个有资格被唤醒的线程
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread); //唤醒此线程
        }
    

    这段代码的意思是,head节点的后继节点如果有资格被唤醒(不是被取消或中断的),那么就直接唤醒此线程;如果head的后继节点不满足被唤醒的条件,就从tail节点回溯寻找waitStatus小于0的第一个节点,然后唤醒此节点的线程。这里有个疑问,为什么不从node.next往后寻找,而需要从tail回溯向前呢?大概是因为node.next也可能为null吧。

    线程被唤醒之后此时同步队列的情况是怎样的呢?我们在回看一下acquire的代码,尤其是acquireQueued这个方法,因为addWaiter仅仅是向同步队列尾部添加节点。

    final boolean acquireQueued(final Node node, int arg) { //此时的node为addWaiter返回的节点
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) { //自旋
                    final Node p = node.predecessor(); //前驱节点
                    if (p == head && tryAcquire(arg)) { //前驱节点为head节点则再次尝试获取同步状态
                        setHead(node); //同步状态获取成功则将头节点设为当前节点
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    shouldParkAfterFailedAcquire是将前置节点的waitStatus设为-1并剔除已中断或取消的线程,而parkAndCheckInterrupt方法是真正用来阻塞线程的:

    private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    此时唤醒下一个可被唤醒的线程之后,LockSupport.park(this)会恢复,继续向下走,返回当前的线程阻断状态。由于acquireQueued里的自旋方法,此时被唤醒的线程会重新获取同步状态,获取成功则将head节点设置成自己,并把threadpre的信息置为null,最后,退出自旋。画个图就是这样的:

    07.png

    此时线程1获取到同步状态,就会将自己设成head节点了,此时同步队列里只有一个线程2是阻塞的。

    可能有朋友会有疑问,获取到同步状态并将head节点设置为自己的前提不是应该前置节点是head节点么(p == head && tryAcquire(arg))?

    这个没关系,假如我们在唤醒下一个可被唤醒的线程之间,前面有线程被取消或中断了,那么找到的这个可被唤醒的线程必然不是head的后继节点,但是因为自旋的存在,一样会经过shouldParkAfterFailedAcquire进行节点过滤,这个节点必然会成为head的后继节点。而由于刚刚这个可被唤醒线程已经通过unpark获取到许可了,那么此时parkAndCheckInterruptpark方法是不会阻塞的,所以再次循环后可以继续将head节点设置成当前可被唤醒线程的节点。

    共享式

    共享式与独占式获取同步状态最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。

    共享式获取同步状态

    public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    

    和独占式的一样,当子类实现的tryAcquireShared方法返回的值小于0,我们才去尝试将线程加入同步队列里。

    来看doAcquireShared方法:

    private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED); //向同步队列尾部插入新节点
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor(); //当前节点的前驱节点
                    if (p == head) { //如果前驱节点是head节点,则再次尝试获取同步状态
                        int r = tryAcquireShared(arg);
                        if (r >= 0) { //如果拿到了同步状态,此时r大于0为还有资源可用,需要传递唤醒后续节点;r=0为当前线程获取到了锁,但是资源已占尽,不需要唤醒后续节点。
                            setHeadAndPropagate(node, r); //获取锁以后的唤醒操作
                            p.next = null; // help GC
                            if (interrupted) //如果被中断,设置中断状态
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    这里有个地方和独占模式不一样,setHeadAndPropagate方法这个方法,独占模式下只需要将头节点设为自己就行,但在共享模式下,我们将head节点设为了当前节点后,还要去尝试唤醒其他的可唤醒节点。(这个很好理解,因为是共享模式,所以必然要大家一起去争抢)

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; //老的头结点
        setHead(node); //将当前线程设为头结点
        //如果还有资源或者头结点的后续节点为待唤醒节点(h.waitStatus为SIGNAL或PROPAGATE)
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            //后继节点不确定或为共享模式节点
            if (s == null || s.isShared())
                doReleaseShared(); //同步状态的释放
        }
    }
    

    共享式释放同步状态

    可以看到,共享式释放同步状态里,在tryReleaseShared释放资源成功的时候就会去执行doReleaseShared方法,这个也是上文setHeadAndPropagate里作为传播唤醒的方法。

    public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    private void doReleaseShared() {
        for (;;) {
            Node h = head; 
            //同步队列中有正在等待的线程
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) { //head结点正在运行,后续线程需被唤醒
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //后继节点成为head结点,设置初始化状态
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); //唤醒head节点后继节点的线程
                }
                //将head节点的状态设置为PROPAGATE,以便于传播唤醒
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            //如果头结点发生变化,证明其他线程获取了锁(setHeadAndPropagate里重设head节点),需要重试,否则退出循环
            if (h == head)                   // loop if head changed
                break;
        }
    }
    

    共享式释放同步状态说的是,在tryReleaseShared执行成功的时候执行doReleaseShared传播释放后继节点。假如现在同步队列里没有等待线程,则把目前唯一的head节点设置为PROPAGATE,这时候是说明资源过剩。如果是在acquireShared方法通过setHeadAndPropagate正好到达doReleaseShared方法,这时候需要判断waitStatus的状态,SIGNAL就唤醒(PROPAGATE在shouldParkAfterFailedAcquire里会变成SIGNAL)。如果同步队列里有等待线程则都唤醒他们。

    总结

    本篇简单分析了一下AQS的基本概念和原理,其中还有一些没提到的如中断异常的acquireInterruptibly,支持超时获取的tryAcquireNanos,等待队列(不是同步队列)等。后续也将会基于AQS分析可重入锁ReentrantLock,读写锁ReentrantReadWriteLock,以及同步工具CountdownLatch,Semaphore,CyclicBarrier的原理和实现。

    相关文章

      网友评论

          本文标题:并发编程之AQS

          本文链接:https://www.haomeiwen.com/subject/jybiyqtx.html