美文网首页
JDK1.8并发包之 -- AQS源码详解

JDK1.8并发包之 -- AQS源码详解

作者: 南风nanfeng | 来源:发表于2018-11-16 18:08 被阅读30次

    0. 本文目标

    本文旨在记录笔者阅读jdk1.8 AQS源码的过程,java.util.concurrent是java的并发包,包含两个包atomic和locks,这两者构成并发框架的基石,前者存放以cas为基础的实现类,后者存放aqs的实现基类。AQS是一个构建锁和同步器的框架,许多同步器都可以通过AQS很容易的构建出来。并发包中的高频使用类如ReentrantLock、Semaphore、CoutDownLatch等均基于AQS 实现的。

    1. AQS数据结构

    AQS全称是AbstractQueuedSynchronizer,是个抽象类。内部定义一个Node的数据结构,实际上是FIFO的线程等待队列,Node是基于CLH原理的锁队列,又有改进。CLH通常用于自旋锁,在AQS中用于阻塞线程,线程的状态status代表线程是否阻塞,如果前驱节点释放锁,则后继节点会被发信号即将准备运行。

    CLH的数据结构如下:

               +------+  prev +-----+       +-----+
          head |      | <---- |     | <---- |     |  tail
               +------+       +-----+       +-----+
    

    CLH告诉我们,从尾节点加入队列,从头部节点出列。且入列、出列都是原子的。线程没有成功则会进入自旋直到成功为止。
    AQS并不是照搬CLH的数据结构,在每个节点又加了next指针,为了实现阻塞机制。每个Node都持有线程id,因此前驱节点发出信号唤醒后继节点就是通过next指针。

    Node数据结构:


    node数据结构

    AQS维护一个状态volatile int state(代表共享资源)和如上图的Node数据结构,就是线程等待队列,当线程争夺资源失败时,则入列。

    不同的同步器争用资源的方式也不同。自定义同步器只需要实现共享资源state的获取与释放即可。至于具体线程等待队列的操作,如出列、入列等,AQS已经实现好了。

    AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。

    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;
    

    Node状态有四种。AQS在判断状态时,通过用waitStatus>0表示取消状态,而waitStatus<0表示有效状态。

    • CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。

    • SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。

    • CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。

    • PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。

    0状态:值为0,代表初始化状态。

    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
     static final int CONDITION = -2;
    /**
      * waitStatus value to indicate the next acquireShared should
      * unconditionally propagate
      */
    static final int PROPAGATE = -3;        
    

    Node属性释义,Node定义的是线程等待队列。waitStatus、prev、next、thread代表的意思显而易见。着重说说nextWaiter,指在condition上等待的节点或者是SHARED节点。由于condition队列是独占模式,因此我们需要简单的队列来持有节点。

    volatile int waitStatus;
    
    volatile Node prev;
    
    volatile Node next;
    
    volatile Thread thread;
    /**
     * Link to next node waiting on condition, or the special
     * value SHARED.  Because condition queues are accessed only
     * when holding in exclusive mode, we just need a simple
     * linked queue to hold nodes while they are waiting on
     * conditions. They are then transferred to the queue to
     * re-acquire. And because conditions can only be exclusive,
     * we save a field by using special value to indicate shared
     * mode.
     */
    Node nextWaiter;
    

    2.源码解析

    AQS中大量私有方法,对外提供的核心方法是acquire-release和acquireShared-acquireRelease。依次来说。

    2.1 acquire
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    方法流程如下:

    • 步骤1,tryAcquire: 尝试以独占方式获取资源,在AQS中是空方法,子类要重写该方法。通常state代表资源。不同锁中释义不同。获取到资源后,返回true,唤醒线程;反之,返回false,转入下个流程。
    • 步骤2,addWaiter: 将线程加入队列尾部,并标记为独占模式。
    • 步骤3,acquireQueued: 让线程在等待队列中获取资源,直到获取到资源才返回;如果等待过程中被中断过,则返回true,否则,返回false。
    • 步骤4, 步骤3线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

    下面直接上源码详解:

    2.1.1 tryAcquire

    tryAcuire在AQS中是空方法,子类要重写该方法,这里没有定义成abstract方法,原因在于独占锁重写tryAcquire和tryRelease,共享锁重写tryAcquireShared和tryReleaseShared,如果定义成abstract,之类要实现四个方法。Doug Lea还是站在咱们开发者的角度,尽量减少不必要的工作量。

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    
    2.1.2 addWaiter

    加入独占线程节点到对列尾巴上。

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // 先尝试快读加入队列,成功则返回新节点node,失败,则采用自旋加入节点知道成功返回该节点。
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    
    2.1.3 enq

    自旋加入新节点,看源码释义。

        private Node enq(final Node node) {
            // 整个自旋,直到加入队尾
            for (;;) {
                Node t = tail;
                if (t == null) { // 如果队列为空,则创建head节点并将该节点设置为tail
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    // 队列存在尾节点,则把node的prev指针指向尾节点tail
                    node.prev = t;
                    // CAS重新设置tail节点
                    if (compareAndSetTail(t, node)) {
                        // 原尾节点的next指针指向新节点node,node就是新的为节点tail,自旋结束,返回原尾节点。
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    2.1.4 acquireQueued

    节点加入为节点成功后,尝试在等待队列中自旋获取资源。源码释义如下。

    final boolean acquireQueued(final Node node, int arg) {
            //标记是否成功拿到资源
            boolean failed = true;
            try {
                //标记等待过程中是否被中断过
                boolean interrupted = false;
    
    
                // 又是自旋,直到node的前驱节点称为头节点,且头结点已释放锁;
                // node节点尝试获取资源成功,则node成功头节点。
                // 否则线程节点自旋,自旋过程中,有可能中断,是否中断的标记。
                // 疑问,标记failed如何生效呢,只有一个return,节点一直在自旋中如何退出呢。
                for (;;) {
                    final Node p = node.predecessor();
    
                    // 如果node前驱节点是head节点,下一个就轮到node啦,则node有机会获取资源。
                    if (p == head && tryAcquire(arg)) {
                        //拿到资源后,将head指向该结点。
                        setHead(node);
                        p.next = null; // help GC                 ? ***这句尚有疑问***
                        failed = false;
                        return interrupted;
                    }
    
                    // 还不具备获取资源条件,说明可以洗洗睡了,如果具备条件,就wait,知道unpark唤醒
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    2.1.5 shouldParkAfterFailedAcquire

    线程获取资源失败后,判断是否阻塞线程

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 获得前驱节点的状态,根据前驱节点的状态判断是否需要休息,即阻塞
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * 前驱节点已经被设置为SIGNAL,即前驱节点释放锁后,马上唤醒后继节点node,那作为后继节点
                 * 就放心了,可以洗洗睡了,不用自旋,等通知就好了
                 */
                return true;
            if (ws > 0) {
                /*
                 * 状态>0,说明前驱线程节点被撤销,跳过所有的被撤销的prev节点,排在它后面
                 * 将上一个小于0的节点设置为node前驱节点
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * 若前驱节点为 0 或者 PROPAGATE.  就要唤醒前驱节点,并设置为SIGNAL
                 * 告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
                 */
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    2.1.6 parkAndCheckInterrupt

    阻塞线程节点,判断线程是否中断。
    另外,park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:
    1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。

        private final boolean parkAndCheckInterrupt() {
            //调用park()使线程进入waiting状态
            LockSupport.park(this);
            return Thread.interrupted();
        }
    
    2.1.7 小结

    看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),现在让我们再回到acquireQueued(),总结下该方法的流程:

    1. 节点加入队尾后,检查状态,找到安全休息点。
    2. 调用park进入waiting状态,等待unpark或者interrupt唤醒自己。
    3. 唤醒后看看自己有没有资格获取资源,如果成功获取,则head指向当前节点,并返回从入队到拿到号的过程中有没有被中断,如果没有拿到号,继续步骤1。


      入队自旋拿号.png
    2.2 release

    release是释放独占资源,它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。此方法可以用来实现unlock方法。

    2.2.1 release
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    逻辑很简单,判断线程是否彻底释放共享资源state,同步器要实现具体的释放方法tryRelease

    2.2.2 tryRelease

    跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。

        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    
    2.2.3 unparkSuccessor

    此方法用于唤醒等待队列中下一个线程。

        private void unparkSuccessor(Node node) {
            /*
             * 当前节点的状态
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);  //置零当前线程所在的结点状态,允许失败。意义在哪?????
    
            /*
             * statue<=0的都是有效节点,唤醒后继节点,通常是node的next指针的对应的节点,
             * 但是碰到null节点或者撤销节点,要重tail遍历往回遍历,找到node最近的节点,并唤醒它。
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立啦),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!!And then, DO what you WANT!

    2.2.4 小结

    release是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。

    2.3 acquireShared

    此方法是共享模式下线程获取资源的顶层入口。获取成功则直接返回,失败则进入等待队列,并自旋知道获取资源为止。

        public final void acquireShared(int arg) {
    
            // 该方法只有一个分支,判断是否能获取到共享资源,能够获取到则返回正整数,否则,返回负整数
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    
    2.3.1 tryAcquireShared

    由不同语义的不同锁实现,AQS中是个空方法保留给子类实现。

        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    

    比如CountDownLatch,用state==0表示共享资源的状态。

            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    
    2.3.2 doAcquireShared

    doAcquireShared表示加入等待队列。

        private void doAcquireShared(int arg) {
            // 同样是自旋加入队尾
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {// 又是自旋
                    final Node p = node.predecessor();
                    // 判断节点的前驱节点是否为头节点,如果是,说明下一次就轮到我啦
                    if (p == head) {
                        // 尝试获取共享资源
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            // //将head指向自己,还有剩余资源可以再唤醒之后的线程,关键看此方法
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
    
                            // //如果等待过程中被打断过,此时将中断补上。
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    有木有觉得跟acquireQueued()很相似?对,其实流程并没有太大区别。只不过这里将补中断的selfInterrupt()放到doAcquireShared()里了,而独占模式是放到acquireQueued()之外,其实都一样,不知道Doug Lea是怎么想的。
    跟独占模式比,还有一点需要注意的是,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)。

    2.3.2 setHeadAndPropagate

    成为头节点,唤醒后继节点。

        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            // head指向node
            setHead(node);
            /*
             * 唤醒满足条件的后继节点
             */
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    
    2.3.3 doReleaseShared

    又是自旋释放满足条件的后继节点

        private void doReleaseShared() {
            /*
             * 自旋释放后继节点
             */
            for (;;) {
    
                // 自旋将动态的head赋值给变量h
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        // 变量h有唤醒状态比较然后替换成初始状态,直到成功,则替换后继节点
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue; 
                        // 唤醒后继节点
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                // 自旋跳出条件,head不变则跳出自旋,head变化则一直自旋
                if (h == head)                   
                    break;
            }
        }
    
    2.3.4 小结

    acquireShared源码解读告一段落。让我们温故一下流程。

    1. tryAcquireShared()尝试获取资源,成功则直接返回;
    2. 失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。
      其实跟acquire()的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作(这才是共享嘛)。
    2.4 releaseShared

    releaseShared释放共享锁,是共享模式释放锁的顶层入口。它会释放指定的共享资源,如果释放成功就会去唤醒等待线程。

        public final boolean releaseShared(int arg) {
            // 尝试释放共享资源,
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    
    2.4.1 tryReleaseShared

    同样是空方法,留给子类重写。返回布尔值,true代表完全释放资源,可以走到分支中通知等待线程队列了。

        protected boolean tryReleaseShared(int arg) {
            throw new UnsupportedOperationException();
        }
    

    下面看CountDownLatch的tryReleaseShared源码,就是自旋state减1,state==0则完全释放锁返回true。

            protected boolean tryReleaseShared(int releases) {
                // Decrement count; signal when transition to zero
                for (;;) {
                    int c = getState();
                    if (c == 0)
                        return false;
                    int nextc = c-1;
                    if (compareAndSetState(c, nextc))
                        return nextc == 0;
                }
            }
    
    2.4.2 doReleaseShared

    尝试释放资源失败后,再次尝试释放资源并唤醒后继节点,同上。

    2.5 总结

    本文介绍了独占和共享两种模式的获取-释放资源方式,共享和独占整体流程差别不大,在于唤醒后继节点的条件,独占锁只能在head释放锁后唤醒后继节点,共享锁能够唤醒满足条件所有后继节点。

    相关文章

      网友评论

          本文标题:JDK1.8并发包之 -- AQS源码详解

          本文链接:https://www.haomeiwen.com/subject/znajfqtx.html