美文网首页Java 杂谈
J.U.C|彻底攻破AQS

J.U.C|彻底攻破AQS

作者: 阅历笔记 | 来源:发表于2019-04-18 17:40 被阅读0次

    一、写在前面


    上篇文章通过ReentrantLock 的加锁和释放锁过程给大家聊了聊AQS架构以及实现原理,具体参见《带你走进AQS内心世界》

    作为一个合格的 攻城狮 光是这些肯定无法满足我们的胃口了。

    于是……

    源码! 源码! 源码 呐喊声连绵不绝。

    好吧!这章我们就再续前缘一起来聊一聊AQS源码的那些事吧,源码的解读比较枯燥,希望大家做好准备。

    二、队列同步器


    队列同步器AbstractQueuedSynchronizer(以下简称同步器

    并发包下用来构建锁或者其他同步组件的基础框架

    主要实现方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态。

    同步器通过一个int类型的成员变量state来表示同步状态。

    当state > 0 表示已经获取到了锁。

    当state = 0 表示资源释放了锁。

    其提供了三个方法,getState()、 setState(int newState)和compareAndSetState(int expect, int update)来操作同步状态,当然同步器能够保证状态的改变是安全的。

    同步器通过内置的FIFO同步队列来完成资源获取线程的排队工作,如果当前线程获取同步状态失败(锁)时,AQS则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。

    到这有没有对同步队列懵逼的??如果有也别着急,在下面源码解析中我会化一些图来解析下。

    三、队列同步器主要接口列表


    同步队列器设计是基于模版方法模式的,也就是说使用者需要继承同步器并重写指定的方法。
    主要接口列表
    1 getState():返回同步状态的当前值;
    2 setState(int newState):设置当前同步状态;
    3 compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能够保证状态设置的原子性;

    4 tryAcquire(int arg):独占式获取同步状态,获取同步状态成功后,其他线程需要等待该线程释放同步状态才能获取同步状态;
    5 tryRelease(int arg):独占式释放同步状态;

    6 tryAcquireShared(int arg):共享式获取同步状态,返回值大于等于0则表示获取成功,否则获取失败;
    7 tryReleaseShared(int arg):共享式释放同步状态;

    8 isHeldExclusively():当前同步器是否在独占式模式下被线程占用,一般该方法表示是否被当前线程所独占;

    9 acquire(int arg):独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用可重写的tryAcquire(int arg)方法;
    10 acquireInterruptibly(int arg):与acquire(int arg)相同,但是该方法响应中断,当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException异常并返回;

    11 tryAcquireNanos(int arg,long nanos):超时获取同步状态,如果当前线程在nanos时间内没有获取到同步状态,那么将会返回false,已经获取则返回true;
    12 acquireShared(int arg):共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
    13 acquireSharedInterruptibly(int arg):共享式获取同步状态,响应中断;
    14 tryAcquireSharedNanos(int arg, long nanosTimeout):共享式获取同步状态,增加超时限制;

    15 release(int arg):独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒;
    16 releaseShared(int arg):共享式释放同步状态;

    同步器提供的模版方法基本分为三种

    独占式获取与释放同步状态、共享式获取与释放同步状态、查询同步状态中等到线程情况

    了解了同步器的基本内容下一章节我们就该进入源码了,源码主讲解
    acquire-release 而acquireShared-releaseShared 的执行过程和前面大同小异就不再这讲了。

    四、进入源码世界


    独占式同步状态获取与释放

    同步状态的获取是通过同步器的acquire(int arg )方法来获取的、该方法对中断敏感,也就是由于线程在获取同步状态失败后会进入同步队列中、后续线程的中断操作,线程不会从队列中移出。代码清单

    acquire(int arg)

    public final void acquire(int arg) { 
            if (!tryAcquire(arg) && 
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
                selfInterrupt(); 
        } 
     }
    

    上述代码主要完成同步状态的获取,节点的构造、加入队列以及在队列中自旋等待等相关工作。

    流程如下

    • 通过tryAcquire(arg) 方法保证线程安全的方式获取同步状态获取失败则构造同步节点(独占式Node.EXCLUSIVE)
    • 通过addWaiter(Node node)方法将该节点加入到队列的尾部,最后调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 使得该节点以自旋的方式获取同步状态,如果获取不到则阻塞该节点中的线程,等待被唤醒。
    • 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。selfInterrupt()

    addWaiter(Node node)

    private Node addWaiter(Node mode) {
    // 以给定的模式来构建节点, mode有两种模式 
    //  共享式SHARED, 独占式EXCLUSIVE;
      Node node = new Node(Thread.currentThread(), mode);
        // 尝试快速将该节点加入到队列的尾部
        Node pred = tail;
         if (pred != null) {
            node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            // 如果快速加入失败,则通过 anq方式入列
            enq(node);
            return node;
        }
    

    enq(final Node node)

    private Node enq(final Node node) {
    // CAS自旋,直到加入队尾成功        
    for (;;) {
        Node t = tail;
            if (t == null) { // 如果队列为空,则必须先初始化CLH队列,新建一个空节点标识作为Hader节点,并将tail 指向它
                if (compareAndSetHead(new Node()))
                    tail = head;
                } else {// 正常流程,加入队列尾部
                    node.prev = t;
                        if (compareAndSetTail(t, node)) {
                            t.next = node;
                            return t;
                    }
                }
            }
        }
    
    

    这是同步队列入列的过程, 代码的注释已经很明白了, 我在这就不多解释,

    什么? ~~~要看图~~~~好吧!就手绘两张吧!

    CLH队列面貌


    同步队列面貌.jpg

    入列过程


    同步队列器入列.jpg

    这下满足了吧! 我们接着往下走!

    final boolean acquireQueued(final Node node, int arg) {
        // 是否拿到资源
        boolean failed = true;
            try {
                // 标记等待过程中是否被中断过
                boolean interrupted = false;
                // 自旋
               for (;;) {
                // 获取当前节点的前驱节点
               final Node p = node.predecessor();
               // 如果其前驱节点为head 节点,说明此节点有资格去获取资源了。(可能是被前驱节点唤醒,也可能被interrupted了的)
              if (p == head && tryAcquire(arg)) {
                // 拿到资源后将自己设置为head节点,
                setHead(node);
               // 将前驱节点 p.next = nul 在setHead(node); 中已经将node.prev = null 设置为空了,方便GC回收前驱节点,也相当于出列。
              p.next = null; // help GC
             failed = false;
             return interrupted;
            }
        // 如果不符合上述条件,说明自己可以休息了,进入waiting状态,直到被unpark()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
             interrupted = true;
         } finally {
            if (failed)
                cancelAcquire(node);
         }
    }
    
    

    此方法当前线程在‘死循环’中尝试获取同步状态,只有其前驱节点为head节点时才有尝试获取同步状态的资格。

    Why?

    • 因为只有head是成功获取同步状态的节点,而head节点的线程在释放同步状态的同时,会唤醒后继节点,后继节点在被唤醒后检测自己的前驱节点是否是head节点
    • 维护CLH的FIFO原则。该方法中节点自旋获取同步状态。

    waht? 还要图~~~~~~。好吧满足你。


    节点自旋获取同步状态.jpg

    接着往下走吧!

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 拿到前驱的状态
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
               // 如果已经告诉过前驱节点,获取到资源后通知自己下,那就可以安心的去休息了。
                return true;
            if (ws > 0) {
               // 如果前驱节点放弃了,那就循环一直往前找,直到找到一个正常等待状态的节点,排在他后面
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
            // 如果前驱状态为0 或者 PROPAGATE 状态, 那就把前驱状态设置成SIGNAL,告诉它获取资源后通知下自己。
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    

    次方法在自己前驱节点不是head节点时再次回到同步队列中找到一个舒适位置休息等待被唤醒。

    private final boolean parkAndCheckInterrupt() {
            // 调用park方法是线程进入waiting 状态
            LockSupport.park(this);
            //如果被唤醒查看是不是被中断状态
            return Thread.interrupted();
        }
    

    最后调用park方法时线程进入wating状态。

    到这独占方式的获取同步状态已经聊完了。 我接着看释放同步状态的过程。

     public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    // 唤醒head节点的后继节点。
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
    

    此方法执行时,会释放同步状态(也就是将state值设置为0),同时会唤醒head节点的后继节点。

    到这独占式获取同步和释放同步状态的源码已经分析完了。 有没有懵尼? 懵了也别怕最后我们再来张流程图帮助大家理解。


    独占锁获取同步状态.jpg

    结合上面源码分析,应该对AQS源码有所了解了吧。

    五、总结

    分析了独占式同步状态的获取和释放过程,适当做下总结: 在获取同步状态时,同步器维持一个同步队列,获取状态失败的线程都会加入到队列中并在队列中进行自旋,出列(或者停止自旋的)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。

    相关文章

      网友评论

        本文标题:J.U.C|彻底攻破AQS

        本文链接:https://www.haomeiwen.com/subject/ibgtgqtx.html