难以理解的AQS(上)

作者: CoderBear | 来源:发表于2019-03-24 11:24 被阅读13次

    在一篇博客中,我们看了下CopyOnWriteArrayList的源码,不是很难,里面用到了一个可重入的排他锁: ReentrantLock,这东西看上去和Synchronized差不多,但是和Synchronized是完全不同的东西。

    Synchronized锁的特性是JVM保证的,ReentrantLock锁的特性是上层的Java代码控制的。而ReentrantLock的基础就是AQS,事实上,很多并发容器都用了ReentrantLock,也就间接的用到了AQS,还有并发框架,如CountDownLatch,CyclicBarrier,Semaphore也都用到了AQS,可见AQS的重要性。

    但是要想稍微深入一点理解AQS实属不易,牵扯到不少东西,所以本篇博客将会拆分成两篇,第一篇将会介绍AQS的前置知识:LockSupport,AQS的核心概念,以及独占、共享模式下,AQS的核心源码解析等,第二篇将会介绍AQS对条件变量的支持,以及AQS的应用等。

    要深入一些学习AQS,首先要掌握一个前置知识:LockSupport。

    LockSupport

    LockSupport是一个工具类,它的主要作用是挂起和唤醒线程,它的底层是调用的native方法,这个我们不去深入探究,主要看下LockSupport的应用。

    park,unpark

    如果调用park方法的线程已经拿到了与LockSupport关联的许可证,调用park后,会立即返回,否则该线程会被阻塞,直到拿到了许可证。
    如果一个线程调用了unpark方法,就会获得与LockSupport关联的许可证,如果该线程之前调用了park而被阻塞,那么会被唤醒,如果该线程之前没有调用park方法,那么调用park方法后,会立刻返回。

        public static void main(String[] args) {
            System.out.println("Hello,LockSupport");
            LockSupport.park();
            System.out.println("Bye,LockSupport");
        }
    

    运行结果:


    image.png

    线程打印出第一句话,就被阻塞了,因为线程没有获得与LockSupport关联的许可证。

        public static void main(String[] args) {
            System.out.println("Hello,LockSupport");
            LockSupport.unpark(Thread.currentThread());
            LockSupport.park();
            System.out.println("Bye,LockSupport");
        }
    

    运行结果:


    image.png

    先调用unpark方法,传入了当前线程,当前线程获得了与LockSupport关联的许可证,随后调用park方法,因为该线程已经有了许可证,所以立即返回,打印出了第二句话。

        public static void main(String[] args) {
            Thread thread=new Thread(()->{
                System.out.println("Hello,LockSupport");
                LockSupport.park();
                System.out.println("Bye,LockSupport");
            });
            thread.start();
            LockSupport.unpark(thread);
        }
    

    运行结果:


    image.png

    首先创建了一个Thread ,内部调用了park方法,随之启动线程,在主线程中,调用了unpark方法,传入了子线程。
    此方法有两种情况:

    • 主线程先调用了unpark方法,子线程中的park方法随后调用。
    • 子线程的park方法先调用,主线程的unpark方法随后调用。

    但是不管是哪种情况,最后的结果都是一样的。只是过程有些区别,第一种情况是
    主线程调用了unpark方法后,让子线程拿到了许可证,子线程内部调用park后立即返回,第二种情况是子线程的park方法先调用到,因为目前还没有拿到许可证,所以被阻塞,随后主线程调用了unpark,让子线程拿到了许可证,子线程被返回。

    parkNanos(long nanos)

    和park方法类似,不同之处在于多了个超时时间,如果调用parkNanos,线程被阻塞了,超过了nanos后,不管有没有获得许可,都会被返回。

        public static void main(String[] args) {
            System.out.println("Hello,LockSupport");
            LockSupport.parkNanos(Integer.MAX_VALUE);
            System.out.println("Bye,LockSupport");
        }
    

    运行结果:


    image.png

    为了可以看到比较明显的效果,所以我把时间设置成了Integer.MAX_VALUE,可以看到虽然没有调用unpark方法拿到许可证,但是一定的时间后,该方法还是被返回了。

    park(Object blocker)

    此方法是比较推荐使用的,因为使用它,可以通过jstack命令查看有关阻塞对象的信息。

    public class Main {
        public void test() {
            LockSupport.park(this);
        }
        public static void main(String[] args) {
            Main main = new Main();
            main.test();
        }
    }
    

    使用jstack pid命令:


    image.png

    还有几个方法,就不一一介绍了。

    有了上面的基础,我们终于可以进入今天的正题了:AQS。

    什么是AQS

    AQS的全称是AbstractQueuedSynchronizer,翻译是中文是抽象同步队列。刚接触AQS的时候,第一感觉这个东西和抽象有关系,因为Abstract。。。后来发现,这个东西和抽象没有半毛钱关系,慢慢的,又有新的理解,这个东西和抽象还真的有点关系,因为它把实现同步队列的一些方法给抽象出来了,供其他上层组件重写或者复用。重点来了,其他上层组件需要重写其中的方法!再说的详细点,就是其他组件需要继承AbstractQueuedSynchronizer,对其中的部分方法进行重写。

    我们先来看下AQS的UML图:


    image.png

    AQS核心概念

    我们先要对AQS进行一个大概的介绍,了解下AQS中比较核心的东西。

    AQS维护了一个FIFO的双向队列,什么是FIFO?就是先进先出的意思,双向队列就是上一个节点指向下一个节点的同时,下一个节点也指向上一个节点,我们从AbstractQueuedSynchronizer关联的Node类中就可以看出来这一点:prev保存的是当前节点上一个node,next保存的是当前节点的下一个节点,有一个专业的名词,分别是前驱节点,后继节点,同时AbstractQueuedSynchronizer类有两个字段,一个是head,一个是tail,顾名思义,head保存了头节点,tail保存了尾节点。

    Node类中的SHARED是用来标记该线程是获取共享资源时被放入等待队列的,EXCLUSIVE用来标记该线程是获取独占资源时被放入等待队列的,从这句话,我们可以看出Node类其实就是保存了放入等待队列的线程,而有的线程是因为获取共享资源失败放入等待队列的,而有的线程是因为获取独占资源失败而被放入等待队列的,所以这里需要有一个标记去区分。

    再啰嗦一句,FIFO双向队列其实就是AQS中的等待队列。

    在Node类中,还有一个字段:waitStatus,它有五个取值,分别是:

    • SIGNAL:值为-1,当前节点在入队后、进入休眠状态前,应确保将其prev节点类型改为SIGNAL,以便后者取消或释放时将当前节点唤醒。也就是说当前节点的waitStatus为SIGNAL的时候,被释放的时候,才会唤醒后继节点。其实,如果要较真的话,这种理解也有点些问题,就先这么理解吧。
    • CANCELLED:值为1,被取消的,在等待队列中等待的线程超时或被中断,进入该状态的节点的将不再变化。
    • CONDITION:值为-2,该节点处于条件队列中,当其他线程调用了Condition的signal()方法后,节点转移到AQS的等待队列中,特别要注意的是,条件队列和AQS的等待队列并不是一回事。
    • PROPAGATE:值为-3。对于这个状态到底是做什么的,网上大多数博客,包括书籍都是简单提了下这是共享模式下专用的,和传播有关,但是没有更深的解释。无奈,菜的抠脚的我至今也没能领悟这个状态值的含义。
    • 0:默认值。

    在AbstractQueuedSynchronizer类中,有一个state字段,被标记为volatile,是为了保证可见性,这个字段的设计可厉害了。对于ReentrantLock来说,state保存的是重入次数,对于ReentrantReadWriteLock来说,state保存的是获取读锁的重入次数和写锁的重入次数。

    AbstractQueuedSynchronizer类中,还有一个内部类:ConditionObject,用来提供条件变量的支持。

    AQS提供了两种方式来获取资源,一种是独占模式,一种是共享模式。

    上面提到过,需要去定义一个类去继承AbstractQueuedSynchronizer类,重写其中的方法,一般来说

    • 对于独占模式,需要重写tryAcquire(arg) ,tryRelease(int arg)方法。
    • 对于共享模式,需要重写tryAcquireShared(arg) ,tryReleaseShared(int arg)方法。

    源码解析

    独占模式

    acquire
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    此方法是独占模式下获取资源的顶级方法,如果线程调用tryAcquire(arg)方法成功了,说明已经获取到了资源,直接返回,如果不成功,则将当前线程封装成waitStaus为Node.EXCLUSIVE的Node插入到AQS等待队列的尾部。

    我们来看下tryAcquire方法:

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    

    纳尼,直接报错了,这是什么鬼?别忘了,我们需要重写这个方法。

    我们再来看下addWaiter(Node.EXCLUSIVE), arg)方法:

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);//封装成Node,新的Node
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;//把尾节点赋值给pred ,pred也就是尾节点了
            if (pred != null) {//如果pred不为NULL
                node.prev = pred;//pred赋值给新节点的前驱节点,也就是新节点的前驱节点是尾节点
                if (compareAndSetTail(pred, node)) {//CAS,如果pred还是尾节点,则把新节点设置成尾节点,设置成功后,进入if
                    pred.next = node;//把新节点赋值给pred的后继节点
                    return node;//返回新节点
                }
            }
            enq(node);
            return node;
        }
    

    此方法先把线程封装成一个(Node.EXCLUSIVE的Node,先尝试把这个Node直接放入队尾,如果成功的话,直接返回,如果失败的话,调用enq(node)进行入队操作:

        private Node enq(final Node node) {
            for (;;) {//自旋
                Node t = tail;//把尾节点赋值给t
                //如果尾节点为空,则新建一个空的Node,用CAS把空的Node设置成头节点
                //成功后,再把尾部节点也指向空的Node
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;//把尾节点赋值给传进来的node的前驱节点
                    if (compareAndSetTail(t, node)) {//CAS,如果t还是尾部节点,则用传进来的node替换旧的尾部节点
                        t.next = node;//设置t的后继节点为传进来的node
                        return t;
                    }
                }
            }
        }
    

    这个方法概括的来说,就是把获取资源失败的node放入AQS等待队列。

    我们再回到顶级方法看下acquireQueued方法:

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();//拿到node的前驱节点,赋值给p
                    if (p == head && tryAcquire(arg)) {//如果p已经是头节点了,代表这个时候
    //node是第二个节点,再次调用tryAcquire获取资源
                        setHead(node);//设置头节点
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&//判断此node是否可以被park
                        parkAndCheckInterrupt())//park
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    又是CAS自旋,首先拿到node的前驱节点,赋值给p,如果p已经是头节点了,代表这个时候node是第二个节点,再次尝试调用tryAcquire获取资源,如果成功,设置头节点为node,返回中断标记位,如果失败,先判断自己是否可以被park,如果可以的话,就park,等待unpark。

    再来看下parkAndCheckInterrupt方法:

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;//拿到前驱节点的waitStatus,赋值给ws
            if (ws == Node.SIGNAL)//如果是SIGNAL
                return true;
            if (ws > 0) {//如果是ws>0,则说明前驱节点被取消了,通过while循环,
                //找到最近的一个没有取消的节点,排到后面
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//CAS设置前驱节点的waitStatus为SIGNAL
            }
            return false;
        }
    

    如果前驱节点的waitStatus为SIGNAL,直接返回,如果前驱节点被取消了,则通过while循环,找到最近的一个没有被取消的节点,排到后面去,如果前驱节点处于其他状态,则通过CAS把前驱节点的waitStatus设置为SIGNAL。

    再来看下parkAndCheckInterrupt方法:

        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    这方法就比较简单了,就是park自己,返回当前线程是否被中断。

    我们来为acquireQueued方法做一个总结:
    找到一个安全点park自己,如果被唤醒了,检查自己是否是第二个节点,如果是的话,再次尝试获取资源,成功的话,就把自己设置为头节点。

    好了,整个顶级的acquire核心内容已经分析完毕了,我们来做一个总结:

    1. 尝试快速获取资源,如果成功,直接返回,失败,进入下一步。
    2. 进行入队操作。
    3. 在等待队列中的线程获取资源。

    最后,画个流程图帮助理解整个流程:


    image.png
    release
        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    此方法是独占模式下释放资源的顶级方法。
    首先调用tryRelease方法,如果成功了,把头节点赋值给h,如果h不为null并且waitStatus 不等于0,调用unparkSuccessor方法,唤醒下一个node。

    tryRelease:

        protected boolean tryRelease(int arg) {
            throw new UnsupportedOperationException();
        }
    

    此方法还是直接报错,因为我们需要重写。这里我们需要尤其注意,此方法是判断资源是否被完全释放了,如果锁是可以重入的,可能多次获得了锁,所以必须把最后一个锁也释放了,这里才能返回ture,否则返回false。

    unparkSuccessor:

        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;//拿到当前节点的waitStatus,赋值给ws
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
            Node s = node.next;//当前节点的下一个节点赋值给s
            if (s == null || s.waitStatus > 0) {//如果s==null或者已经被取消了,就通过for循环找到下一个需要被唤醒的节点
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);//唤醒
        }
    

    此方法的核心就是唤醒下一个需要被唤醒的节点。

    共享模式

    acquireShared
          public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    

    该方法是共享模式下获取资源的顶级方法。
    首先调用tryAcquireShared,来尝试获取资源,成功的话,则调用doAcquireShared,进入等待队列,直到获取了资源。

    tryAcquireShared:

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    

    我们需要重写tryAcquireShared方法。

    doAcquireShared:

       private void doAcquireShared(int arg) {
           final Node node = addWaiter(Node.SHARED);//入队
           boolean failed = true;
           try {
               boolean interrupted = false;
               for (;;) {
                   final Node p = node.predecessor();//拿到当前节点的前驱节点,赋值给p
                   if (p == head) {//如果p是头节点
                       int r = tryAcquireShared(arg);//调用tryAcquireShared尝试获取资源
                       if (r >= 0) {
                           setHeadAndPropagate(node, r);//设置头节点,如果还有剩余资源,唤醒下一个节点
                           p.next = null; // help GC
                           if (interrupted)
                               selfInterrupt();
                           failed = false;
                           return;
                       }
                   }
                   if (shouldParkAfterFailedAcquire(p, node) &&
                       parkAndCheckInterrupt())
                       interrupted = true;
               }
           } finally {
               if (failed)
                   cancelAcquire(node);
           }
       }
    

    此方法和独占模式下的流程区别不大,最大的不同在于setHeadAndPropagate方法,我们来看看这个方法做了什么:

        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; 
            setHead(node);//设置头节点
            //如果还有剩余资源
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;//找到后继节点
                if (s == null || s.isShared())
                    doReleaseShared();//调用doReleaseShared方法
            }
        }
    

    首先设置当前节点为头节点,如果还有剩余的资源,就找到后继节点,调用doReleaseShared方法,这个方法我们后面再看,但是从方法名称来看,我们可以知道它与释放共享资源有关。

    releaseShared
        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

    此方法是共享模式下释放资源的顶级方法。
    tryReleaseShared方法还是需要我们去重写的,如果成功了,调用doReleaseShared方法:

        private void doReleaseShared() {
            for (;;) {
                Node h = head;//把头节点赋值给h
                if (h != null && h != tail) {
                    int ws = h.waitStatus;//拿到h的waitStatus赋值给ws
                    if (ws == Node.SIGNAL) {//如果为SIGNAL
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;         
                        unparkSuccessor(h);//唤醒后继节点
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;            
                }
                if (h == head) 
                    break;
            }
        }
    

    这个方法在共享模式下获取共享资源的顶级方法acquireShared中的doAcquireShared中的setHeadAndPropagate也会调用。

    好了,独占模式,共享模式下的获取资源,释放资源核心流程已经分析完毕了。

    细心的你,一定发现在AQS中还有acquireInterruptibly()/acquireSharedInterruptibly()这两个方法,这两个方法从名称上来看仅仅是多了一个Interruptibly,它们是会对中断进行响应的,而我们上面介绍的acquire,acquireShared是忽略中断的。

    本篇博客到这里就结束了,但是还有一块东西没有讲到:对条件变量的支持,这部分内容将放到下一篇博客再详细介绍。

    相关文章

      网友评论

        本文标题:难以理解的AQS(上)

        本文链接:https://www.haomeiwen.com/subject/uujxvqtx.html