美文网首页JUC
并发编程-(7)-Condition原理

并发编程-(7)-Condition原理

作者: tianlang136520 | 来源:发表于2020-03-26 10:49 被阅读0次
    死神---五番队队长蓝染

    目录:

    • 1、Condition介绍

      • 1.1、Condition介绍
    • 2、Condition目的

    • 3、Condition使用

      • 3.1、等待/通知伪代码
    • 4、Condition最佳实战

      • 4.1、有界队列
    • 5、Condition原理

      • 5.1、newCondition()方法
      • 5.2、ConditionObject对象
      • 5.3、类关系图
      • 5.4、等待队列图解
      • 5.5、 源码分析-等待-await()
        • 5.5.1、addConditionWaiter() - 添加元素至Condition Queue
        • 5.5.2、fullyRelease(Node) - 释放锁--从Sync Queue中出队
        • 5.5.3、isOnSyncQueue(Node) - 判断节点是否处于Sync Queue
      • 5.6、源码分析-通知-signal()
        • 5.6.1、 signal() - 唤醒Condition Queue中的首节点
        • 5.6.2、 doSignal(Node) - 唤醒first节点
        • 5.6.3、transferForSignal(Node) - 迁移节点并唤醒await的线程
      • 5.7、源码分析-回到-await()唤醒的位置
        • 5.7.1、checkInterruptWhileWaiting(Node) - 唤醒后检查是否有被中断
        • 5.7.2、transferAfterCancelledWait(Node) - 抛出 InterruptedException 还是重新中断
        • 5.7.3、acquireQueued(node, savedState) 当前被唤醒的节点去抢占同步锁
        • 5.7.4、reportInterruptAfterWait(interruptMode) 根据中断标识来进行中断上报
    • 5.8、回顾await()

    1、Condition介绍

    定义:Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition), 只有满足条件时,线程才会被唤醒。

    2、Condition目的

    等待/通知模式:

         java对象都有一组监视器方法主要包括:wait()、notify()、notifyAll()方法,这些方法与Synchronized关键字配合使用。(必须加锁才能使用监视器方法)
         Condition接口也提供了类似Object监视器方法,await(),signal(),signalAll(),配合Lock使用。
    Object对象监视器 VS Condition接口:

    对比项 Object监视器 Condition接口
    前置条件 对象锁 Lock.lock(),
    Lock.newCondition()
    调用方式 object.wait() condition.await()
    等待队列数量 一个 多个
    线程进入等待状态
    是否释放锁
    释放 释放
    进入等待状态,
    是否相应中断
    响应中断 不响应中断
    线程进入等待状态
    直到将来的某个时间
    不支持 支持
    (condition.awaitUntil(Date))

    3、Condition使用

    eg:等待/通知伪代码
    Lock lock = new xxxLock();
    Condition condition = lock.newCondition();
    // 进入等待状态
    try{
      lock.lock();
      condition.await();//进入等待状态
    } finally{
      lock.unlock();
    }
    // 唤醒等待状态线程
    try{
      lock.lock();
      condition.signal();//唤醒
    } finally{
      lock.unlock();
    }
    

    4、Condition最佳实战

    eg:4.1、有界队列

    有界队列需要满足如下条件:

    • 当队列为空时,获取操作将会阻塞获取线程,直到队列中有新增元素。
    • 队列已满时,队列的插入操作将会阻塞插入的线程,直到队列出现空位。
    有界队列如下:
    /**
     * @program: jvmproject
     * @description: 有界队列
     * - 当队列为空时,获取操作将会阻塞获取线程,直到队列中有新增元素。
     * - 队列已满时,队列的插入操作将会阻塞插入的线程,直到队列出现空位。
     * @author: biudefu
     * @create: 2019-09-21 21:55
     **/
    public class BoundsQueue<T> {
    
        private Object[] items;
    
        private int addIndex;
    
        private int removeIndex;
    
        private int count;
    
        private Lock lock = new ReentrantLock();
    
        private Condition notEmpty = lock.newCondition();
    
        private Condition notFull = lock.newCondition();
    
        public BoundsQueue(int size) {
            items = new Object[size];
        }
    
        public void add(T a) {
            lock.lock();
            try {
                while (items.length == count) {
                    System.out.println(DateUtil.getNowYYMMDDHHMMSS()+Thread.currentThread().getName()+",进入add等待状态!");
                    notFull.await();
                    System.out.println(DateUtil.getNowYYMMDDHHMMSS()+Thread.currentThread().getName()+",从add等待状态被唤醒!");
                }
                items[addIndex] = a;
                if (++addIndex == items.length) {
                    addIndex = 0;
                }
                ++count;
                notEmpty.signal();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
    
        }
    
        public T fetch() {
            lock.lock();
            try {
                while (0 == count) {
                    System.out.println(DateUtil.getNowYYMMDDHHMMSS()+Thread.currentThread().getName()+",进入fetch等待状态!");
                    notEmpty.await();
                    System.out.println(DateUtil.getNowYYMMDDHHMMSS()+Thread.currentThread().getName()+",从fetch等待状态被唤醒!");
                }
                Object x = items[removeIndex];
                if (++removeIndex == items.length) {
                    removeIndex = 0;
                }
                --count;
                notFull.signal();
                return (T) x;
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
    
            return null;
        }
    
    }
    
    测试类:
    /**
     * @program: jvmproject
     * @description: 测试BoundsQueue有界队列
     * @author: biudefu
     * @create: 2019-09-21 22:22
     **/
    public class BoundsQueueMain {
    
        public static void main(String[] args) {
            BoundsQueue<String> fuirts = new BoundsQueue<String>(5);
    
            new Thread(()->{
                while(true){
                    fuirts.add("apple");
                }
    
            },"Thread-ADD-A").start();
    
            new Thread(()->{
                while(true){
                    fuirts.add("banana");
                }
    
            },"Thread-ADD-B").start();
    
            new Thread(()->{
                while(true){
                    String iterm = fuirts.fetch();
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+" fetch result:"+iterm);
                }
    
            },"Thread-fetch-C").start();
    
        }
    
    }
    
    

    测试结果如下:

    2019-09-21 22:33:15,Thread-ADD-A,进入add等待状态!
    2019-09-21 22:33:15,Thread-ADD-B,进入add等待状态!
    2019-09-21 22:33:15,Thread-ADD-A,从add等待状态被唤醒!
    2019-09-21 22:33:15,Thread-ADD-A,进入add等待状态!
    Thread-fetch-C fetch result:apple
    2019-09-21 22:33:17,Thread-ADD-B,从add等待状态被唤醒!
    2019-09-21 22:33:17,Thread-ADD-B,进入add等待状态!
    Thread-fetch-C fetch result:apple
    2019-09-21 22:33:19,Thread-ADD-A,从add等待状态被唤醒!
    2019-09-21 22:33:19,Thread-ADD-A,进入add等待状态!
    Thread-fetch-C fetch result:apple
    2019-09-21 22:33:21,Thread-ADD-B,从add等待状态被唤醒!
    2019-09-21 22:33:21,Thread-ADD-B,进入add等待状态!
    Thread-fetch-C fetch result:apple
    2019-09-21 22:33:23,Thread-ADD-A,从add等待状态被唤醒!
    2019-09-21 22:33:23,Thread-ADD-A,进入add等待状态!
    Thread-fetch-C fetch result:apple
    2019-09-21 22:33:25,Thread-ADD-B,从add等待状态被唤醒!
    2019-09-21 22:33:25,Thread-ADD-B,进入add等待状态!
    Thread-fetch-C fetch result:apple
    2019-09-21 22:33:27,Thread-ADD-A,从add等待状态被唤醒!
    2019-09-21 22:33:27,Thread-ADD-A,进入add等待状态!
    Thread-fetch-C fetch result:banana
    2019-09-21 22:33:29,Thread-ADD-B,从add等待状态被唤醒!
    2019-09-21 22:33:29,Thread-ADD-B,进入add等待状态!
    Thread-fetch-C fetch result:apple
    2019-09-21 22:33:31,Thread-ADD-A,从add等待状态被唤醒!
    2019-09-21 22:33:31,Thread-ADD-A,进入add等待状态!
    Thread-fetch-C fetch result:banana
    2019-09-21 22:33:33,Thread-ADD-B,从add等待状态被唤醒!
    2019-09-21 22:33:33,Thread-ADD-B,进入add等待状态!
    Thread-fetch-C fetch result:apple
    2019-09-21 22:33:35,Thread-ADD-A,从add等待状态被唤醒!
    2019-09-21 22:33:35,Thread-ADD-A,进入add等待状态!
    Thread-fetch-C fetch result:banana
    2019-09-21 22:33:37,Thread-ADD-B,从add等待状态被唤醒!
    2019-09-21 22:33:37,Thread-ADD-B,进入add等待状态!
    ....
    

    5、Condition原理:

    5.1、newCondition()方法:
    public Condition newCondition() {
        return sync.newCondition();
    }
    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
        ...
    }
    
    5.2、ConditionObject对象:
    public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {
        ...
        public class ConditionObject implements Condition, java.io.Serializable {
            ...
            public ConditionObject() { }
            ...
        }
        ...
    }
    
    5.3、类关系图:
    类关系图 宏观看AQS
    • Condition接口定义了多个await方法和两个signal方法。
    • ConditionObject实现了Condition接口,Condition需要获取锁,所以它也是AQS的内部类。
    • Reentrantlock的newCondition方法实际上执行了AQS的子类Sync的newCondition方法,返回的是Condition的实现类ConditionObject。

    5.4、等待队列图解:

    等待队列结构

    eg. Thread-A& Thread-B 简单实现await和signal:(接下来源码分析会以此为例)

    /**
     * @program: jvmproject
     * @description: condition模拟wait方法
     * @create: 
     **/
    public class ConditionDemoWait implements Runnable {
    
    
        private Lock lock;
        private Condition condition;
    
        public ConditionDemoWait(Lock lock, Condition condition) {
            this.lock = lock;
            this.condition = condition;
        }
    
        @Override
        public void run() {
            System.out.println("begin -ConditionDemoWait");
            try {
                lock.lock();
                condition.await();
                System.out.println("end - ConditionDemoWait");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
    }
    
    /**
     * @program: jvmproject
     * @description: condition的signal方法模拟notify方法
     * @create: 
     **/
    public class ConditionDemoSignal implements Runnable {
    
        private Lock lock;
        private Condition condition;
    
        public ConditionDemoSignal(Lock lock, Condition condition) {
            this.lock = lock;
            this.condition = condition;
        }
    
        @Override
        public void run() {
            System.out.println("begin -ConditionDemoSignal");
            try {
                lock.lock();
                condition.signal();
                System.out.println("end - ConditionDemoSignal");
            } finally {
                lock.unlock();
            }
        }
    
    }
    
    /**
     * @program: jvmproject
     * @description: condition测试用例
     * @create: 
     **/
    public class ConditionMain {
    
        public static void main(String[] args) throws InterruptedException {
            Lock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
            new Thread(new ConditionDemoWait(lock,condition)).start();
            TimeUnit.SECONDS.sleep(20);
    
    
            new Thread(new ConditionDemoSignal(lock,condition)).start();
        }
    
    }
    

    每个Condition对象都包含一个队列(等待队列),特点如下:

    • Condition底层采用单行链表,先进先出(FIFO)。
    • 一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。
    5.5、源码分析-等待-await():
            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    

    当前调用顺序如下:
    addConditionWaiter() -> fullyRelease(Node) -> isOnSyncQueue(Node) -> checkInterruptWhileWaiting(Node)

    5.5.1、 addConditionWaiter() - 添加元素至Condition Queue

            /**
             * Adds a new waiter to wait queue.
             * @return its new wait node
             */
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
    //如果lastwaiter为null,或者等待状态是取消,触发整个单向等待链表清理CANCEL节点。
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
    // 清除cancelled后重新获取WaitQueue中的尾节点
                    t = lastWaiter;
                }
    // 构建WaitQueue的预节点,Node.nextWater的属性在sync queue表示节点类型mode,exclusive(独占)还是shared(共享);
    // 在condition queue中都是exclusive (独占),nextWaiter用作本意--下一个等待节点。
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
     // 如果尾节点引用为null则说明队列未被初始化
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    
    // unlinkCancelledWaiters() - 从前往后删除cancelled节点
           private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    Node next = t.nextWaiter;
                    if (t.waitStatus != Node.CONDITION) {
    // 等待状态是CANCEL将节点制空。
                        t.nextWaiter = null;
    // trail作为遍历单向链表的游标,灵活的操纵每一个节点。
                        if (trail == null)
                            firstWaiter = next;
                        else
                            trail.nextWaiter = next;
                        if (next == null)
                            lastWaiter = trail;
                    }
                    else
                        trail = t;
                    t = next;
                }
            }
    

    5.5.2、fullyRelease(Node) - 释放锁--从Sync Queue中出队

    final int fullyRelease(Node node) {
            boolean failed = true;
            try {
    // 获得重入的次数
                int savedState = getState();
    // 释放同步锁,并且唤醒同步队列中下一个节点
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    

    在未执行fullyRelease(Node)之前,该节点同时存在于Condition Queue与Sync Queue,所以此时要将Sync Queue中的head出队,如果在release期间有任何异常或者是release返回false则直接将Condition Queue中的节点置为cancelled,由unlinkCancelledWaiters()进行清理。

    5.5.3、isOnSyncQueue(Node) - 判断节点是否处于Sync Queue ,false表示不在,true表示在 (如果在Sync Queue中,就与调用signal()方法效果一致了)

    • 如果不在 AQS 同步队列,说明当前节点没有唤醒去争抢同步锁,需要把当前线程阻塞,直到其他的线程调用 signal 唤醒。
    • 如果在 AQS 同步队列中,意味着需要去竞争同步锁获得执行程序执行权限,为什么要做这个判断呢?
      原因是在 condition 队列中的节点会重新加入到 AQS 队列去竞争锁。也就是当调用 signal 的时候,会把当前节点从 condition 队列转移到 AQS 队列。
    final boolean isOnSyncQueue(Node node) {
    // 在sync queue中的节点有若干特点,waitStatus无condition,node.prev一定不为null(prev表示是双向链表)
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
    // 在condition queue中下一个节点是用nextWaiter表示,只有Sync queue用next表示。
            if (node.next != null) // If has successor, it must be on queue
                return true;
            /*
             * node.prev can be non-null, but not yet on queue because
             * the CAS to place it on queue can fail. So we have to
             * traverse from tail to make sure it actually made it.  It
             * will always be near the tail in calls to this method, and
             * unless the CAS failed (which is unlikely), it will be
             * there, so we hardly ever traverse much.
             */
    // 如果通过前面两个规则,还不能校验出是否处于Sync queue,可能还存在于Sync Queue中(还未完全执行enq的时刻),就只能采用遍历方式了(效率最低),从tail往前迭代以确保当前节点是否真的存在于Sync Queue中
            return findNodeFromTail(node);
        }
    
        /**
         * Returns true if node is on sync queue by searching backwards from tail.
         * Called only when needed by isOnSyncQueue.
         * @return true if present
         */
        private boolean findNodeFromTail(Node node) {
    // 从tail往前所有节点node
            Node t = tail;
            for (;;) {
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }
    

    处理中断:
    JSR133修订以后,就要求如果中断发生在signal操作之前,await方法必须在重新获取到锁后,抛出InterruptedException。但是,如果中断发生在signal后,await必须返回且不抛异常,同时设置线程的中断状态。
    上述规范我们再来看这个方法,整个方法分成两块:if块和while块+return语句。上半部分属于中断发生先于signal,此时需归还节点至Sync Queue。下半部分属于signal先于中断,此时只能等待signal的入队操作完成。如果是中断发生先于signal那么要告知调用方抛出InterruptedException,反之则仅需标记中断。

    在执行完isOnSyncQueue(Node)确认过后:

    • 如果Node仍未回到Sync Queue的话那么当前线程将会挂起,等待唤醒。
    • 如果Node已经回到了Sync Queue中,说明已经触发过了signal(),那就继续执行唤醒后的代码就可以了~

    5.6、源码分析-通知-signal():

    5.6.1、 signal() - 唤醒Condition Queue中的首节点
          public final void signal() {
    // 判断当前线程是否获得了锁
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
    // 拿到 Condition 等待队列上第一个节点
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
           protected final boolean isHeldExclusively() {
                // While we must in general read state before owner,
                // we don't need to do so to check if current thread is owner
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
    
    5.6.2、 doSignal(Node) - 唤醒first节点
          private void doSignal(Node first) {
                do {
    // 将 next 节点设置成 null
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    
    5.6.3、transferForSignal(Node) - 迁移节点并唤醒await的线程
        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
    // Node节点要从Condition等待队列转移到AQS同步队列中,首先就要将waitStatus还原到0,如果更新失败,只有一种可能就是节点被 CANCELLED 了
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
    // 调用enq,把当前节点添加到AQS队列。并且返回按当前节点
    的上一个节点,也就是原 tail 节点
            Node p = enq(node);
            int ws = p.waitStatus;
    // 如果原tail节点的waitStatus是CANCEL状态,或者原tail节点更新waitStatus状态为SIGNAL失败(SIGNAL 表示: 他的 next 节点需要停止阻塞),唤醒阻塞的线程
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    // 唤醒节点上的线程.
                LockSupport.unpark(node.thread);
    //如果node的prev节点已经是signal状态,那么被阻塞的ThreadA的 唤醒工作由 AQS 队列来完成
            return true;
        }
    

    执行完 doSignal 以后,会把 condition 队列中的节点转移到 aqs 队列上,逻辑结构图如下 这个时候会判断 ThreadA 的 prev 节点也就是 head 节点的 waitStatus,

    • 如果大于 0 或者设 置 SIGNAL 失败,表示节点被设置成了 CANCELLED 状态。这个时候会唤醒 ThreadA 这个线程。
    • 否则就基于 AQS 队列的机制来唤醒,也就是等到 ThreadB 释放锁之后来唤醒 ThreadA。

    5.7 源码分析-回到-await()唤醒的位置:

           public final void await() throws InterruptedException {
    //           if (Thread.interrupted())
    //                throw new InterruptedException();
    //            Node node = addConditionWaiter();
    //            int savedState = fullyRelease(node);
    //            int interruptMode = 0;
    //            while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
    // 此处是await()唤醒
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
    //被唤醒后,重新开始正式竞争锁,同样,如果竞争不到还是会将自己沉睡,等待唤醒重新开始竞争。
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
    5.7.1、checkInterruptWhileWaiting(Node) - 唤醒后检查是否有被中断
    // 唤醒后检查在睡眠期间是否有被中断过:如果为没有中断过就返回0,如果被中断了就返回非0
    // 在确认被中断之后的行为也会有所不同,需要深入到transferAfterCancelledWait(Node)
        private int checkInterruptWhileWaiting(Node node) {
     // 线程被唤醒后检测中断状态,如果为被中断就返回0,如果被中断了就返回非0
                return Thread.interrupted() ?
                    (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                    0;
        }
    
    
    5.7.2、transferAfterCancelledWait(Node) - 判断后续的处理应该是抛出 InterruptedException 还是重新中断
    final boolean transferAfterCancelledWait(Node node) {
    //使用 cas 修改节点状态,如果还能修改成功,说明线程被中断时, signal() 还没有被调用。
    // 这里有一个知识点,就是线程被唤醒,并不一定是在 java 层面执行了locksupport.unpark,也可能是调用了线程的 interrupt()方法,这个方法会更新一个中断标识,并且会唤醒处于阻塞状态下的线程。
            if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
    //如果cas成功,则把node添加到AQS队列
                enq(node);
                return true;
            }
            /*
             * If we lost out to a signal(), then we can't proceed
             * until it finishes its enq().  Cancelling during an
             * incomplete transfer is both rare and transient, so just
             * spin.
             */
    //如果 cas 失败,则判断当前 node 是否已经在 AQS 队列上,如果不
    在,则让给其他线程执行
    //当 node 被触发了 signal 方法时,node 就会被加到 aqs 队列上
            while (!isOnSyncQueue(node))
                Thread.yield();
            return false;
        }
    
    5.7.3、acquireQueued(node, savedState) 当前被唤醒的节点 ThreadA 去抢占同步锁。并且要恢

    复到原本的重入次数状态

    5.7.4、reportInterruptAfterWait(interruptMode) 根据 checkInterruptWhileWaiting 方法返回的中断标识来进行中断上报
    // 如果是 THROW_IE,则抛出中断异常
    // 如果是 REINTERRUPT,则重新响应中断
            private void reportInterruptAfterWait(int interruptMode)
                throws InterruptedException {
                if (interruptMode == THROW_IE)
                    throw new InterruptedException();
                else if (interruptMode == REINTERRUPT)
                    selfInterrupt();
            }
    

    5.8、回顾await()

    public final void await() throws InterruptedException {
      if (Thread.interrupted())
        throw new InterruptedException();
    // 将当前线程包装成Node,添加到ConditionObject自己维护的一个单向等待链表中
      Node node = addConditionWaiter();
      // 在拥有exclusive锁的前提下,释放该锁,当后继节点唤醒时就算将当前拥有锁的线程逐出sync queue
      // 如果在sync queue中释放锁失败,就会将condition queue中的节点置为cancelled
      int savedState = fullyRelease(node);
      int interruptMode = 0;
    // 释放完毕后,遍历AQS的队列,看当前节点是否在队列中?
    // 不在 说明它还没有竞争锁的资格,所以继续将自己睡眠。
    // 等待新节点入队: 1.其他线程调用signal()会入队,2.当前线程被中断也会入队
      while (!isOnSyncQueue(node)) {
    // 挂起直至被signal
        LockSupport.park(this);
    // 如果是LockSupport唤醒的,会循环然后被挂起,等待进入到sync queue才退出循环
    // 但如果是强行中断,也会添加至sync queue,并且返回中断标识(非0)
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 检测在park期间有没有被执行过中断操作
          break;
      }
    // 节点自旋在获取锁成功后返回, 如果没有中断就返回false,如果被中断了就返回true
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
      // 如果在重新获得锁的期间有其他节点被添加到condition queue,就再整理一下
      if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
      if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    }
    

    5.8 Condition总结

    5.8.1、await 和 signal 的总结

    把整个分解的图再通过一张整体的结构图来表述,线程 awaitThread 先通过
    lock.lock()方法获取锁成功后调用了 condition.await 方法进入等待队列,而另一个线程
    signalThread 通过 lock.lock()方法获取锁成功后调用了 condition.signal 或者 signalAll 方
    法,使得线程 awaitThread 能够有机会移入到同步队列中,当其他线程释放 lock 后使得线
    程 awaitThread 能够有机会获取 lock,从而使得线程 awaitThread 能够从 await 方法中退出
    执行后续操作。如果 awaitThread 获取 lock 失败会直接进入到同步队列:


    await&signal完整图

    相关文章

      网友评论

        本文标题:并发编程-(7)-Condition原理

        本文链接:https://www.haomeiwen.com/subject/wopluctx.html