美文网首页
带你看看Java-AQS同步器 源码解读<五>条件队列Condi

带你看看Java-AQS同步器 源码解读<五>条件队列Condi

作者: burgxun | 来源:发表于2020-04-02 13:40 被阅读0次

    AQS Condition的实现

    AQS中的ConditionObject和node

    static final class Node {
            /**
             * 同步队列的头  初始化 或者setHead方法可修改
             */
            static final Node SHARED = new Node();
            /**
             * 标识这个节点用于 独占模式(排它 反正一个意思)
             */
            static final Node EXCLUSIVE = null;
    
            /** 下面是 waitStatus 的几个常量值  */
    
            /**
             * 表明等待线程已经取消
             */
            static final int CANCELLED = 1;
            /**
             * 表述如果当前节点的前一个节点状态是 SIGNAL  那么就可以阻塞当前自己的线程 不用去争抢资源了  没用 不然会一直尝试去获取资源
             */
            static final int SIGNAL = -1;
            /**
             * 线程在条件队列中等待
             */
            static final int CONDITION = -2;
            /**
             * 共享模式下  无条件传播   该状态的进程处于可运行状态
             */
            static final int PROPAGATE = -3;
    
            /**
             * 当前node 状态
             */
            volatile int waitStatus;
    
            /**
             * 同步队列的前置节点
             */
            volatile Node prev;
    
            /**
             * 同步队列的后置节点
             */
            volatile Node next;
    
            /**
             * 当前节点所属的线程
             */
            volatile Thread thread;
    
            /**
             * 用于条件队列  是条件队列的下一个节点
             */
            Node nextWaiter;
    
            /**
             * 是否是共享模式  这个方法只会在同步队列中使用  nextWaiter在同步队列中复用了
             */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            /**
             * 获取当前节点的前置节点 没有就抛出异常
             */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    
    final class ConditionObject implements Condition {
    
            /**
             * 条件队列的头节点
             */
            private transient Node firstWaiter;
            /**
             * 条件队列的尾节点
             */
            private transient Node lastWaiter;
    
            /**
             * ConditionObject 默认的构造函数
             */
            public ConditionObject() {
            }
    }
    

    第一篇文章的时候 我和大家也描述过 Condition Queue 实际上是一个单向链表 在分析Node节点的时候 我描述过prev和next都是给Sync Queue使用的 实际上对于Condition Queue node节点 有效的字段 就是 nextWaiter ,waitStatus和thread字段

    条件队列-await源码分析

    0-await方法

     public void await() throws InterruptedException {
                if (Thread.interrupted())//判断当前线程 是否被中断了 如果中断了 抛出中断异常
                    throw new InterruptedException();
                Node node = addConditionWaiter();//新增一个新的等待节点到条件队列中
                int savedState = fullyRelease(node);//释放当前节点占用的资源  并返回线程持有的状态值
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {//判断当线程节点是否在同步队列中
                    LockSupport.park(this);//如果不在同步队列中 那就阻塞当前线程 等待唤醒
                    /*
                     * 能执行到下面的代码 说明线程从阻塞状态中唤醒了 唤醒可能有2种情况
                     * 1:是线程发生了中断
                     * 2:是线程接受到signal信号 从阻塞状态中被唤醒
                     * checkInterruptWhileWaiting 返回值有3个
                     * 0表示:线程没有被中断
                     * 1 REINTERRUPT表示:中断在signal之后发生的
                     * -1 THROW_IE表示:中断在signal之前发生的
                     */
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//检查是否发生过线程中断 0表示没有发生
                        break;// 如果线程没有中断 说明被signal唤醒  那就继续判断是否唤醒了当前线程 如果是当前线程 会进入到同步队列中
                }
                /*
                 * 这边的代码 就是当前的node已经在Sync Queue 中了
                 * acquireQueued 我们在之前独占锁加锁的时候 也分析过  就是去获取资源 获取不到的话 就排队等待继续阻塞
                 * acquireQueued返回true 说明在进入Sync队列中 等待的过程中锁的过程中也发生了中断 
                 *acquireQueued返回true 返回false 说明没有发送过中断 那下面的赋值就不会走到 
                 *如果acquireQueued返回true 而且interruptMode是非THROW_IE 那个整个方法就是REINTERRUPT的结果 因为都不需要抛出异常
                 */
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;//记录线程中断的表示位
                /*
                 *这边的意思就是如果当前节点nextWaiter不是等于null的说明 node节点还是和Condition
                 * queue 关联着的  那就执行一下清理操作 吧condition queue里面的非等待节点剔除
                 * 那种情况下会走到这步呢 那就是当前的interruptMode是THROW_IE的时候
                 * 为什么呢 因为THROW_IE的意思 是中断发送在signal之前 signal
                 * 因为如果是signal的话 当前节点的nextWaiter为被置为null的  可以回看下代码
                 */
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)//这边0 说明一直没发生过中断
                    reportInterruptAfterWait(interruptMode);
            }
    

    1-addConditionWaiter

             /**
             * 新增一个新的等待节点到等待的条件队列中
             *
             * @return its new wait node
             */
            private Node addConditionWaiter() {
                Node t = lastWaiter;//等待条件的队列的最后一个
                //如果最后的lastwaiter 节点状态是非Condition 说明已经取消 就清理ConditionQueue的方法
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;//unlinkCancelledWaiters方法里面lastWaiter可能又重写赋值了
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);//当前线程包装成node节点
                /*
                 * t是null 说明尾节点为null 说明条件队列中没有值 所以node 成了firstWaiter
                 * t不为null 那就加入到队尾
                 * */
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;//lastWaiter 重写赋值 因为node是最后加入的 node就是lastWaiter
                return node;
            }
    
             /**
             * 条件队列从头部开始 移除非CONDITION节点
             */
            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;//头节点赋值给t
                Node trail = null;//trail是t的next节点的上一个为CONDITION的节点
                //这个循环做的是从头节点开始移除不是CONDITION的节点
                while (t != null) {
                    Node next = t.nextWaiter;//next为t的下一个节点
                    if (t.waitStatus != Node.CONDITION) {//如果t的状态不是CONDITION 说明不应该在条件队列中 取消了 要移除
                        t.nextWaiter = null;//把t的下一个节点设置为null 这样让t 和整个条件队列链表断开 也方便GC
                        /**
                         *trail 是null 说明是第一次进来吧 但是第一次的t是firstWaiter 这个时候firstWaiter的节点为CONDITION
                         * 所以下面有个赋值把 firstWaiter的下一个节点 赋值给firstWaiter 意思就是说 让下个节点成为头节点
                         * 如果trail不是为null 那就把当地节点的next赋值给trail的下个节点  因为当前节点t 不可用了 所以要将t的
                         * 下个节点  重新和链表关联起来 也就是说重新指向上一个节点 而trail其所就是t的上一个有效的节点
                         * 所以有了这个赋值
                         * */
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        /*
                         * 如果next 等于null 了 说明t没有下个节点了  这个时候trail 应该就是有效的最后一个节点
                         * */
                        if (next == null)
                            lastWaiter = trail;
                    } else
                        trail = t;//trail相当于一个临时的变量  这边的赋值就是我上面说的   trail是next的上一个有效的节点值
                    t = next;//next赋值给t 准备下一次的循环
                }
            }
    

    上面的整个代码 是我注释了addConditionWaiter方法 大家应该能看明白,这个方法主要做的就是包装当前线程为node 然后加入的Condition Queue的队尾 这其中还做了一个条件队列元素清理的工作,清除一些非Condition状态的节点

    2-fullyRelease

    下面我们来看下第二个方法 fullyRelease 看名字 我们应该也能猜出就是释放当前线程占用的资源,而且是完全释放,为什么是fully呢,那是比如重入锁,可以重入,每次lock的时候同步器的状态State都会+1,可以去看下第一篇的文章,应该有描述过,而且fullyRelease方法是有返回值的 返回的savedState就是当前线程持有的状态值,为什么要记录下来呢,那是后面我们再次争取锁资源的时候 需要用到这个savedState

    /**
         * 释放当前节点持有的所有资源,并且唤醒同步队列中的head节点去获取资源
         */
        final int fullyRelease(Node node) {
            boolean failed = true;//表示 是否释放失败
            try {
                int savedState = getState();//获取同步器的状态值state
                if (release(savedState)) {//就是释放资源 唤醒等待的线程去获取资源 之前已经描述过 不清楚的 看下第二篇文章
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();//释放失败 抛出异常
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;//如果释放失败 就把当前节点设置去取消  着就解释了 为什么之前加入节点的时候回去做检查
                // 丢弃非Condition的节点
            }
        }
    

    3-isOnSyncQueue

    isOnSyncQueue 方法 就是判断当前节点是否在同步队列SyncQueue中,如果是的话 就跳出while循环执行后面的方法,如果不在的话 那就要进入while循环体呢 做线程等待了,至于为什么要这样判断,那时因为node 节点加入到ConditionQueue 中,如果执行Signal方法,被唤醒的线程节点,会转移到SyncQueue中,这个具体后面的Signal方法里面 我们具体再说。

    看下代码:

        /**
         * 判断当前node 是否在同步队列中
         */
        final boolean isOnSyncQueue(Node node) {
            /*
             *节点的状态是condition一定不再同步队列中 
             *如果节点加入到同步队列中 使用enq方法 那么当前节点的pre 一定是非空的 
             *那么如果当前pre是为null
             *那就不在Sync queue 中
             */
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // 如果当前节点有后继节点 必然是在同步队列中的 因为next是同步队列中的node 才会存在这一的情况
                return true;
            return findNodeFromTail(node);//去同步队列中匹配node 节点
        }
        
        /**
         * 从尾部节点开始搜索 看是否能找到当前的node节点
         */
        private boolean findNodeFromTail(Node node) {
            Node t = tail;//同步队列的尾部节点
            for (; ; ) {
                if (t == node)// t ==node 说明在同步队列能找到 返回true
                    return true;
                /*
                 * t==null 第一次循环说明tail节点不存在 说明同步队列就是不存在的 那node更不可能存在于同步队列中返回false
                 * 后面的循环t 就是之前的节点的前pre节点  如果为null 说明已经找到了头部节点了 都没有匹配到node 也返回false
                 */
                if (t == null)
                    return false;
                t = t.prev;
            }
        }
    

    每行代码的 具体语义 我都在注释里面了 不清楚的 结和整个方法理解一下

    4-while方法体内部

    当执行到while 内部的时候,刚才我也分析过,执行到while里面说明 当前的node节点不在SyncQueue中,说明就在ConditionQueue中,首先看到 有个阻塞线程的操作,这个和独占锁 阻塞当前线程是一个道理,这边等待是Signal唤醒当前线程,然后继续往下执行
    后面有一个方法checkInterruptWhileWaiting 这个方法其实是要关注一下的,
    先看下代码:

        /**
         * Mode meaning to reinterrupt on exit from wait
         */
        private static final int REINTERRUPT = 1;
        /**
         * Mode meaning to throw InterruptedException on exit from wait
         */
        private static final int THROW_IE = -1;
    
        /**
         * 检查是否发生过线程中断
         * 返回0表示:线程没有被中断
         * 1表示:中断在signal之后发生的
         * -1表示:中断在signal之前发生的
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
        }
        /**
         * Transfers node, if necessary, to sync queue after a cancelled wait.
         * Returns true if thread was cancelled before being signalled.
         */
        final boolean transferAfterCancelledWait(Node node) {
            /*
             * 这个地方给大家特别说明下:
             * 刚才上面我提到过 被唤醒有2中方式 可能是被signalled 或者被interrupted
             * 下面的有个CAS的操作 就是 将当前节点的状态更新成0
             * 如果更新成功说明了 当前节点的状态依旧是CONDITION 也就是说还在条件队列中 那就说明了不是被signal唤醒的 那就是被中断了
             * 同理 如果更新失败 则说明当前节点的状态 已经被修改了  那说明就是被signalled了的 因为被signal 会将当前节点状态修改 转移到Sync queue中
             */
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
                enq(node);//这边更新成功 说明当前线程发生了中断 而且中断在signal之前 这边做一个补偿操作 把节点放入到Sync 队列中
                return true;
            }
            /*
             * 这边又判断了下 当前节点是否在同步队列中  为什么还要判断呢 是因为虽然发生了signal
             * 但是 我们看下transferForSignal的方法能知道  是先执行修改节点状态的CAS操作 然后再执行enq的入队操作
             * 所以这边虽然状态已经修改 但是可能线程正在执行enq 方法  所以这边判断了下 如果没有在Sync队列中
             * 那当前线程就坐下yield 就是线程执行让出一下 意思就是稍等会儿
             */
            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }
            
    

    线程在while内部被阻塞 然后被唤醒 只有2中方式:1是线程发生了中断,二是
    checkInterruptWhileWaiting 方法返回值有3个 一个是0说明线程从等待到唤醒没有发生过中断

    第二个返回值是THROW_IE,它的值是-1,从命名上面我们能知道 这个是要抛出中断异常,它的执行结果其实就是线程的中断在Signal之前发生了

    第三个返回结果是REINTERRUPT 它的值是1 意思就是重新做下线程中断,这个是由于中断在Signal之后发生的

    ==这边有个条件就是 如果返回0的话 循环是继续的 不会break 我在网上查询说 这边可能存在“假唤醒”的问题 因为返回0 线程一定是没有中断,那就是被唤醒了,但是被唤醒的node 会进入到SyncQueue中的呀,为什么这边不跳过循环,反而是继续循环判断?这边没搞明白,有知道的小伙伴 可以告知一下!==

    5-while之后的方法

    while 之后的方法 说明当前线程已经在SyncQueue 那就执行和独占锁的获取方法一样的acquireQueued方法,不清楚的这个方法怎么运行的小伙伴,可以回看下第一篇文章,acquireQueued主要做的就是去尝试获取锁资源,如果获取不到线程还是阻塞等待的,直到被唤醒。该方法是有返回值的 如果返回ture 说明在等待过程中发生了中断,如果是false 说明没有。如果返回true 而且上面的interruptMode是非Throw-IE的 那interruptMode值就是ReInterrupt
    后面的nextWaiter!=null,说明当前节点还没有和ConditionQueue断开,这边执行下ConditionQueue的清理操作,把非Condition状态的节点从条件队列中剔除出去。最后如果interruptMode非0就执行下对于的状态操作reportInterruptAfterWait

    具体代码也很简单:

    /**
             * 这边就是根据 刚才interruptMode 不同的值 做出不同的回应
             * THROW_IE 意思就是抛出异常
             * REINTERRUPT 意思就是做出线程重写中断的操作  可以让上层去检测处理
             */
            private void reportInterruptAfterWait(int interruptMode)
                    throws InterruptedException {
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT)
                    selfInterrupt();
            }
    
    

    6-await 总结

    await还有几个重构的方法,里面的核心方法上面我都讲了,剩下的有兴趣的可以自己尝试去理解看看,具体方法有什么区别 我在上篇分析Condition接口的时候 接口方法上面都注释了!

    • 第一步 执行await方法执行的时候当前线程一定是获得了锁的,不然执行这个方法的时候回报错的,有兴趣的可以自己写下Demo,自己看下在哪一步报错,偷偷告诉你下在释放tryRelease的时候!

    • 第二步 就是将当前线程封装成node节点 放入ConditionQueue的尾部

    • 第三步 释放当前线程持有的所有同步器State

    • 第四步 判断当前节点是否在SyncQueue中 如果是 就第五步 如果不是 就线程阻塞 等待Signal信号 唤醒

    • 第五步 执行acquireQueued方法 去重新获取锁资源

    • 最后一步 获取到锁后 根据前面的中断状态 做出对应的处理 方法返回

    条件队列-signal源码分析

    signal方法

    signal方法 是从Contidion头部开始选一个合法的节点 转换到SyncQueue中

       public void signal() {
           if (!isHeldExclusively())
              throw new IllegalMonitorStateException();
           Node first = firstWaiter;
           if (first != null)
              doSignal(first);
        }
        
         /**
         * 一个判断 判断是否用于锁的线程和释放线程是同一个  子类从写实现
         */
        protected boolean isHeldExclusively() {
            throw new UnsupportedOperationException();
        }
        /**
         * 使得条件队列中的第一个没有被cancel的节点 enq到同步队列的尾部
         */
        private void doSignal(Node first) {
            do {
                /*
                 * 这边说明条件队列只有first 一个节点转移完first节点设置lastWaiter也为null
                 * 设置first的nextWaiter 等于null
                 */
                if ((firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;//first 要被加入到同步队列中 修改nextWaiter==null
            } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
        }
        
         /**
         * 将node节点从调节队列中转换到同步队列中  如果返回是true 那说明转换成功
         */
        final boolean transferForSignal(Node node) {
            /*
             * 如果当前的CAS操作失败 说明node节点的状态已经不是condition了 可能已经被cancel了 所以返回false
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            Node p = enq(node);//将当前的node节点 加入到同步队列中 独占锁的时候已经分析过  返回的节点p是node节点的prev节点
            int ws = p.waitStatus;
            /*
             * 这边ws是node的prev 节点p的状态  如果p的ws 大于0 那说明p已经cancel了  那就可以直接唤醒node节点
             * 这边不明白的可以去结合shouldParkAfterFailedAcquire 方法看下 这个方法里面有如果node的pre节点是Cancel的话 会做重写寻找pre节点
             * 同样的下面的CAS 操作将node的前驱节点P的ws状态修改为signal失败  说明当前的p节点的状态已经被别的线程修改了
             * 那就要去唤醒node节点线程去获取资源锁
             * 之前我们独占锁的时候都说过  同步队列中 节点都是通过自己的前驱节点去唤醒的
             */
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    
    

    signal 方法比较简单 上面我也描述过了 有些地方如果看不懂 还是要结合整个await方法互相看下 每一个判断都存在道理

    signalAll方法

    signalAll方法 是将所有ConditiaonQueue中node节点转换到SyncQueue中

            public void signalAll() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignalAll(first);
            }
            
             /**
             * 移除条件队列中所有节点 挨个转移到同步队列中
             */
            private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;//因为所以节点 都已经转移 所以条件队列就为null 了
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);//循环转移 直到最后一个nextWaiter等于null
            }
            
    

    从代码上 我们也能看到signalAll就是做一个所有节点的转移操作,doSignalAll方法入口就是设置当前的 lastWaiter = firstWaiter = null 保证了一个整体的操作,如果有人想问 为什么不直接把ConditionQueue接到SyncQueue的后面 不就好了么,为什么还要挨个去循环,那是因为2中队列的结构不一样,没法直接全部转移,Sync是用next和prev连接前后节点的但是Condition 是用NextWaiter连接后面的节点的,是一个单向链表,2者没法直接关联!

    总结Sync-Queue和Conditian-Queue

    Sync-Queue:


    SyncQueue.png

    Condition-Queue:


    ConditionQueue.png

    上面就是SyncQueue和ConditionQueue的流程图


    写了5篇文章分析了下AQS的源码 大部分源码都已经做了注解,如果看不明白的,多看几遍 ,对着源码看,第一篇可以看我的注解,第二遍可以尝试自己单独看源码,是否能看明白,最好自己能debug走一遍 看下,一定的能够加深影响,最后文中如果有些的不对的,希望大家能够指正
    后面我会整理下,把所有代码放到github 里面 方便大家看

    ==预告:后面几篇我会写下具体实现AQS的java的的类,ReentrantLock,Semaphore,CountDownLatch,DelayQueue等等==

    完结!!!

    相关文章

      网友评论

          本文标题:带你看看Java-AQS同步器 源码解读<五>条件队列Condi

          本文链接:https://www.haomeiwen.com/subject/jbkxphtx.html