美文网首页
AQS源码分析

AQS源码分析

作者: HannahLi_9f1c | 来源:发表于2019-04-14 16:05 被阅读0次

    一、AQS是什么

    AQS(AbstratctQueuedSynchronizer)是一个抽象类的同步器,使用了模板方法的设计模式,也就是在抽象类AQS中定义算法的骨架,然后具体的子类需要重写AQS的一些方法,这样就能实现不同场景所需要的同步器,java中的ReetrantLock,ReentrantReadWriteLock等很多锁都是基于AQS实现的。AQS简单来说就是有一个当前工作线程并且维护一个被锁定的线程队列,并且实现独占锁或者是共享锁。
    大概原理是这样的


    image

    二、AQS怎么使用

    1. 模板方法

    • 模板方法使用继承,在父类中定义了一系列算法骨架,和一些可复用的方法,子类继承父类的方法,在不同的应用场景重写算法,满足不同需求。
    • 模板方法和策略模式很像,都是在高层定义算法,底层实现具体的实现,模板方法多是用继承实现,而策略模式通过委托实现。策略模式的好处就是耦合比较低,但是AQS是用模板方法实现的,原因就是可以在模板方法定义一些公共的方法,子类可以复用,而策略模式则不能,策略模式更多体现的是可替换性,也就是不同的算法之间可替换,而不会对应用造成影响。

    2. 自定义AQS

    • 了解了模板方法之后,我们就知道如何使用AQS自定义同步器了,也就是要继承AQS,然后重写它的一些方法
    isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
    tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
    tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
    tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
    tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。
    
    
    • 独占方式重写tryAcquire和tryRelease,共享方式重写tryAquireShared和tryReleaseShared方法
    • 重写实现的主要是对资源state的占用和释放方式。
    • 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
    • 学习一下ReetrantLock的源码,我们大概就知道怎么使用了。

    三、学习源码之前需要了解

    1. 线程的状态

    线程状态转换

    image
    image

    当线程进入到synchronized方法或者synchronized代码块时,线程切换到的是BLOCKED状态,而使用java.util.concurrent.locks下lock进行加锁的时候线程切换的是WAITING或者TIMED_WAITING状态,因为lock会调用LockSupport的方法。

    2. CAS+volatile实现线程安全

    • volatile语义可见性+有序性,但不能保证并发安全,使用场景:

    (1)对变量的写操作不依赖于当前值。
    (2)该变量没有包含在具有其他变量的不变式中。

    • CAS则是一种乐观锁策略,通过一条处理器级指令保证并发安全,如果没有获得锁,就会进入自旋尝试获取锁。也就是在竞争不是很大的时候的一种并发策略。
    • CAS+volatile的使用既保证了这个变量的可见性,又能在存在竞争的时候线程安全地获取资源,虽然volatile修改存在并发安全问题,但是用CAS可以避免这个问题。
    • 至于什么时候不用synchronized而是CAS+volatile非阻塞乐观锁?应该在synchronized的阻塞造成了系统性能的瓶颈的时候,因为CAS+volatile的编写更加复杂

    三、AQS源码分析

    1. 数据结构

            static final Node SHARED = new Node();//共享模式的标记
            static final Node EXCLUSIVE = null;//独占模式的标志
            static final int CANCELLED =  1;//表明线程已经取消
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            //一种线程等待状态,需要工作线程唤醒
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;//一种等待状态,表明线程有条件性等待
            /**
             * waitStatus value to indicate the next acquireShared should
             * unconditionally propagate
             */
            static final int PROPAGATE = -3;
    
            /**
             * Status field, taking on only the values:状态域,包括以下值
             *   SIGNAL:     * 这个节点的继任者被阻塞(通过park),所以当前线程当释放或者取消的时候必须要唤醒它的继任者
             为了避免竞争,获取方法需要先表明signal,然后原子性获取。失败之后进入bolck状态
             *   CANCELLED: 
             这个节点由于超时或者中断被取消。节点不释放它的资源。尤其,一个个被取消的节点不会再一次blocks
             *   CONDITION:  
             这个节点当前在一个条件等待队列
             *   PROPAGATE: 
             非负表明节点不需要唤醒。所以,大多数代码不需要检查特点值,只是用来标记
             
             这个参数初始化为0
             condition会初始化为CONDITION,通过CAS修改值,或者无条件地volatile写**/
            volatile int waitStatus;
    
           
             /*当前工作线程的前继线程
             由于head线程总是不取消,所以一个节点只能通过aquire获取。一个取消的线程不能aquire。一个线程总是自我取消,而不是其他节点来取消*/
            volatile Node prev;
    
          
            volatile Node next;
    
        
             /*入队的线程.构造函数初始化,使用后置为null*/
            volatile Thread thread;
            Node nextWaiter;
    
    • waitStatus表示当前被封装成Node结点的等待状态,共有4种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE。
    • CANCELLED:值为1,在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该Node的结点,其结点的waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化。
    • SIGNAL:值为-1,被标识为该等待唤醒状态的后继结点,当其前继结点的线程释放了同步锁或被取消,将会通知该后继结点的线程执行。说白了,就是处于唤醒状态,只要前继结点释放锁,就会通知标识为SIGNAL状态的后继结点的线程执行。
    • CONDITION:值为-2,与Condition相关,该标识的结点处于等待队列中,结点的线程等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
    • PROPAGATE:值为-3,与共享模式相关,在共享模式中,该状态标识结点的线程处于可运行状态。
    • 0状态:值为0,代表初始化状态。
    /**
         等待队列头节点,除了初始化,通过setHead修改。注意:如果头存在,它的等待状态不能是CANCELLED.
         */
        private transient volatile Node head;
    
        /**
         尾节点,通过enq方法添加新的等待节点
         */
        private transient volatile Node tail;
    
        /**
         * 资源state.
         */
        private volatile int state;
    
    • AQS维护着一个队列,维护着共享资源的访问如下


      image

      state的有三种访问方式

        /**
         * @return current state value
         */
        protected final int getState() {
            return state;
        }
    
        /**
         * Sets the value of synchronization state.
         * @param newState the new state value
         */
        protected final void setState(int newState) {
            state = newState;
        }
    
        /**
         * Atomically sets synchronization state to the given updated
         * value if the current state value equals the expected value.
         * This operation has memory semantics of a {@code volatile} read
         * and write.
         *
         * @param expect the expected value
         * @param update the new value
         * @return {@code true} if successful. False return indicates that the actual
         *         value was not equal to the expected value.
         */
        protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    
    • 那么它为什么有两种方式修改state呢,我觉得可能是使用场景不同吧

    2. 独占模式下的获取acquire

    2.1. acquire

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    我们可以看到这个入口函数是final修饰的,说明设计的时候不希望子类修改这个方法,它先是用tryAcquire获取,如果获取成功就返回;addWaiter将该线程加入等待队列的尾部,并标记为独占模式;acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false;如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

    2.2 tryAquire

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    

    这个方法通过protected修饰,只有子类访问,也就是在创建同步器的时候自定义获取方式。这个AQS是抽象类,所以只抛出异常

    2.3 addWaiter

      private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

    这个方法先尝试快速入队,如果成功就返回,否则通过enq入队

      private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    enq主要是通过自旋一直尝试将节点插入尾部,所以可以在快速尝试失败后调用。这个就是CAS自旋volatile的经典使用

    2.4 acquiredQueued

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                        //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    由于尝试获取资源失败,于是进行等待队列等待别的线程结束唤醒自己。如果自己的前任节点是head,并且获取成功,返回;不然就在waiting中一直等待

    2.5

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;//如果已经设置了前结点释放资源后会唤醒它,那么就可以返回休息了
            if (ws > 0) {
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                 /*
             * 如果前驱放弃了,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边。     */
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
    //如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下。有可能失败,人家说不定刚刚释放完呢!
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    不是SIGNAL,需要加入等待队列等待,同时尝试下看有没有机会轮到自己

    2.6 总结

    流程如下


    image

    3. 独占模式下的release

    3.1 release

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    直接利用tryRelease释放线程,因为独占模式下只有head线程占有资源,因此直接释放即可。然后要唤醒一个等待线程

    3.2 tryRealse

        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    

    这个方法也是需要在子类重写的,父类直接抛出异常。重写的时候如果释放资源成功返回true,否则返回false

    3.3 unparkSuccessor

        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            //头节点的线程状态<0,那么利用cas把它状态设置为0
            if (ws < 0)
               compareAndSetWaitStatus(node, ws, 0);
            Node s = node.next;
            //从tail往前遍历,寻找最前面需要唤醒的节点,然后unpark唤醒
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    这个函数就是寻找继任线程,并将它唤醒

    4. acquireShared

    • 然后我们继续研究非独占式的获取共享资源和释放共享资源

    4.1 acquireShared

        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    

    通过tryAcquiredShared获取资源,如果没有获取到,执行doAcquireShared

    4.2 tryAcquireShared

        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    

    类似于tryacquire,抽象类中直接抛出异常

    4.3 doAcquireShared

     private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            //共享节点加到队列
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                //获取node的前继节点
                    final Node p = node.predecessor();
                    if (p == head) {
                    如果前继是head,看能不能获取资源
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                           //如果还有资源,要唤醒下一个等待节点 setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    20             //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
    
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    这个函数试图获取资源,并且将head设置为signal,表示等他释放资源了就能唤醒它的next。这里的资源只能是head.next获取,体现了公平性
    4.4 setHeadAndPropagate

     private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; // Record old head for check below
            setHead(node);//node设置为head
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                //释放head的资源
                    doReleaseShared();
            }
        }
    

    4.5 总结
    这个非独占式的获取跟acquire区别不是很大,无非就是线程拿到资源之后如果发现资源有剩余,会唤醒下一个正在等待的线程,而且唤醒的是临近等待的线程,体现了公平性,但是性能会差一点

    5. releaseShared

    • 这个跟独占模式下的释放区别不大,主要也是释放资源,唤醒线程。区别主要在于独占模式需要state=0之后才唤醒其他线程,而非独占模式只要剩余资源就回去唤醒线程

    相关文章

      网友评论

          本文标题:AQS源码分析

          本文链接:https://www.haomeiwen.com/subject/hvfjwqtx.html