美文网首页JUC并发包
并发包同步器的核心AQS-深入-条件队列

并发包同步器的核心AQS-深入-条件队列

作者: 于情于你 | 来源:发表于2021-04-06 14:10 被阅读0次

        ConditionObject主要是为并发编程中的同步提供了等待通知的实现方式,可以在不满足某个条件的时候挂起线程等待(await)。直到满足某个条件的时候在唤醒线程(signal/signalAll)。


    结构

    API层

    将当前线程的锁释放,并加入条件等待队列

     public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
               // 用当前线程创建一个新节点,并加入等待队列
                Node node = addConditionWaiter();
              // 释放锁,并唤醒后面节点的线程
                int savedState = fullyRelease(node);
                int interruptMode = 0;
               // 当前节点不在同步队列的时候挂起当前线程
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
    
                     // 线程被唤醒,是否被中断了,如果没中断了退出循环
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                // 尝试获取锁,获取锁成功interruptMode改为REINTERRUPT
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                // 清理状态是取消的节点
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                // 根据interruptMode响应
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    

    REINTERRUPT,退出等待时再次中断。THROW_IE,退出等待时抛出InterruptedException

    将等待时间最长的线程,从条件等待队列,加入等待锁的同步队列

      public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    

    内部方法调用层

    往等待队列中添加节点

     private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                // 如果最后一个节点是取消状态,则清空等待队列中所有的被取消的节点
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                // 新建一个节点
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                // 队列是空,则把当前节点当作firstWaiter,否则加到最后一个节点后面
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    
    节点加入等待队列

    等待队列转移到同步队列

     private void doSignal(Node first) {
    
                // 从指定节点开始把节点转移到同步队列并删除节点,直到有一个节点转移成功或者等待队列空了
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    
    
     final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            // 把节点的waitStatus更新成0,如果更新失败则退出
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            // 将节点加入到同步队列,如果节点被取消操作了,或者更新节点状态失败,那么就唤醒当前节点的线程
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    
    image.png

    相关文章

      网友评论

        本文标题:并发包同步器的核心AQS-深入-条件队列

        本文链接:https://www.haomeiwen.com/subject/ycbskltx.html