美文网首页
2.Condition核心原理分析

2.Condition核心原理分析

作者: 致虑 | 来源:发表于2020-09-23 16:39 被阅读0次

    Condition核心原理分析

    java内置锁的wait/notify实现了等待唤醒机制,那么实现自定义的锁时自然是少不了同样的功能,那么借助AQS实现自定义锁时该如何做呢,或者JUC提供了什么样的机制。

    Condition就是实现同步等待通知的最佳利器了。看看具体是如何实现的。

    public interface Condition {
            void await();
            void awaitUninterruptibly();
            long awaitNanos(long nanosTimeout);
            boolean await(long time, TimeUnit unit);
            boolean awaitUntil(Date deadline);
            void signal();
            void signalAll();
    }
    
    方法名 描叙
    void await() 进入等待,直到被唤醒
    void awaitUninterruptibly() 进入等待,知道被唤醒,但可以被中断
    long awaitNanos(long nanosTimeout) 进入等待,超时或者被唤醒(默认单位为纳秒)
    boolean await(long time, TimeUnit unit) 进入等待,超时或者被唤醒
    boolean awaitUntil(Date deadline) 进入等待,如果到达最后期限或者被唤醒或中断则返回
    void signal() 唤醒一个等待的线程
    void signalAll() 唤醒所有等待的线程

    代码一目了然,无非就是await、signal,对应的就是wait、notify

    直接写一个demo,看看效果

    public static void main(String[] args) {
      Lock lock = new ReentrantLock();
      Condition conditionA = lock.newCondition();
      AtomicInteger target = new AtomicInteger(1);
    
      new Thread(() -> {
        try {
          lock.lock();
          while(true){
            if(target.get() % 2 != 0){
              System.out.println(Thread.currentThread().getName() + "执行,target:" + target.getAndIncrement());
              TimeUnit.SECONDS.sleep(1);
            }else{
              conditionA.signalAll();
              System.out.println(Thread.currentThread().getName() + "进入等待");
              conditionA.await();
            }
          }
        } catch (InterruptedException e) {
          e.printStackTrace();
        }finally {
          lock.unlock();
        }
      },"线程1").start();
    
      new Thread(() -> run(lock, conditionA, target, 2),"线程2").start();
      new Thread(() -> run(lock, conditionA, target, 3),"线程3").start();
    }
    
    private static void run(Lock lock, Condition conditionA, AtomicInteger target, int i) {
      try {
        lock.lock();
        while (true) {
          if (target.get() % i == 0) {
            log.log(Thread.currentThread().getName() + "执行,target:" + target.getAndIncrement());
            TimeUnit.SECONDS.sleep(1);
          } else {
            conditionA.signalAll();
            System.out.println(Thread.currentThread().getName() + "进入等待");
            conditionA.await();
          }
        }
      } catch (InterruptedException e) {
        e.printStackTrace();
      } finally {
        lock.unlock();
      }
    }
    

    分析下执行结果

    线程1执行,target:1
    线程1进入等待
    线程2执行,target:2
    线程2进入等待
    线程3执行,target:3
    线程3进入等待
    线程1进入等待
    线程2执行,target:4
    线程2进入等待
    线程3进入等待
    线程1执行,target:5
    线程1进入等待
    线程2执行,target:6
    线程2进入等待
    线程3进入等待
    线程1执行,target:7
    线程1进入等待
    线程2执行,target:8
    线程2进入等待
    线程3执行,target:9
    

    1)线程1*在 *target != 2 时获取锁,否则进行等待并唤醒所有其他线程,当target==1时,线程1获取锁,执行输出,担当target增长为2时,线程1进入等待,释放锁,并唤醒线程2线程3

    2)线程2target % 2==0 时获取锁,否则进行等待并唤醒所有其他线程,当target%2==0时,线程2获取锁,执行输出,担当target增长为3时,线程2进入等待,释放锁,并唤醒线程1线程3

    3)线程3target % 3==0 时获取锁,否则进行等待并唤醒所有其他线程,当target%3==0时,线程3获取锁,执行输出,担当target增长为4时,线程3进入等待,释放锁,并唤醒线程1线程2

    依照上面的demo,那么接下来我们一步步具体看看我们使用到的实现类:AbstractQueuedSynchronizer#ConditionObject

    1.定义Condition

    Lock lock = new ReentrantLock();
    Condition conditionA = lock.newCondition();
    
    // ...  进入 ReentrantLock 
    public Condition newCondition() {
            return sync.newCondition();
    }
    
    // ...  进入 ReentrantLock.Sync 
    final ConditionObject newCondition() {
        return new ConditionObject();
    }
    
    // ...  进入 AbstractQueuedSynchronizer 
    public ConditionObject() { }
    public class ConditionObject implements Condition, java.io.Serializable{ ... }
    

    可以看到 lock.newCondition()最终是通过sync.newCondition();进行构造,通过代码知道sync就是AbstractQueuedSynchronizer的实现类(即自定义的队列同步器)。由此可以看出,Condition其实是由AbstractQueuedSynchronizer的实现类new出来的,可以看到最终的实现类是AbstractQueuedSynchronizer#ConditionObject,ConditionObject实现至Condition,因此必然实现了具体的等待唤醒机制,那么接着结合demo进行进一步拆解。

    简单用图示表示就是如下结构:

    Condition

    这里有一点很重要,那就是通过指定ReentrantLock对象来new出Condition,可以让该Condition指向了一把具体的锁,那么线程调用该Condition进行await或者signal时,就明确知道操作的是哪把锁,因此就能与其他竞争该锁的线程进行通信了。

    那么分析一下这个<u>ConditionObject</u>对于等待唤醒机制是如何做的。

    先记住一点:一个condition也就是一个队列,就是存放竞争该condition而导致等待线程的队列

    public class ConditionObject implements Condition, java.io.Serializable {
      private transient Node firstWaiter;  // 头节点
      private transient Node lastWaiter;     // 尾节点
       ...
    }
    

    不错,ConditionObject是AQS的内部类,它内部维护了头尾节点,节点也是通过AQS中定义的Node构造而成,因此形成了一个同步队列。通过前面AQS的分析,Node中维护了线程及前后节点的指针,因此就很好理解,await就是将线程构造成Node加入队列,signal就是唤醒队列中指定的节点中的线程

    但是这里有一点与AQS中的node的区别一定要注意(上图中也明确表示):AQS中的队列时同步双向队列,通过next及pre维护前后节点;但是Condition中的队列时单向等待队列,通过nextWaiter维护下一个节点;这也是在前面介绍AQS中队列节点时,nextWaiter是空的原因。

    2.核心原理分析

    public final void await() throws InterruptedException {
        // 如果线程被中断,异常出去 
        if (Thread.interrupted()) throw new InterruptedException();
        // 根据当前线程构造node,并加入到等待队列
        Node node = addConditionWaiter();
      
        // 遍历等待队列,尝试释放锁,该逻辑在AQS中有讲
        int savedState = fullyRelease(node);
        int interruptMode = 0;
      
        // 判断是否是同步队列(这里主要是与AQS中的同步队列作比较)
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
      
        // 尝试再次获取同步状态,若成功则退出等待时重新中断,(返回true代表没有获取成功)
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT; // 退出等待时重新中断
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
      
        // 清空waitStatus != -2 的节点(waitStatus>0代表节点已经取消了)
        if (interruptMode != 0)
            // 进入等待吧
            reportInterruptAfterWait(interruptMode);
    }
    

    这里的逻辑主要就是将当前调用await()的线程包装成node节点,并加入到等待队列中去,当然在真正加入队列前还进行了一次尝试。同时设置waitStatus=-2,表示当前节点在等待队列中。

    // 唤醒
    public final void signalAll() {
      if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
      
      // 找到等待队列中的头节点
      Node first = firstWaiter;
      if (first != null)
            doSignalAll(first);
    }
    
    private void doSignalAll(Node first) {
      lastWaiter = firstWaiter = null;
      do {
            // 一直遍历队列中的节点
          Node next = first.nextWaiter;
          first.nextWaiter = null;
        
            // 唤醒
          transferForSignal(first);
          first = next;
      } while (first != null);
    }
    
    final boolean transferForSignal(Node node) {
      // 状态校验
      if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
      Node p = enq(node);
      int ws = p.waitStatus;
      if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 执行唤醒
            LockSupport.unpark(node.thread);
      return true;
    }
    

    上面逻辑也很简单,无非就是遍历等待队列中的所有节点,进行一定的状态校验后,进行唤醒操作。

    • 2.1 按照demo中的逻辑,首先线程1(T1)、线程2(T2)、线程3(T3)同时竞争资源(AQS),因为原子变量target初始值是1,因此此刻只有T1获取到锁,那么刚开始队列是这样的:
    condition_1
    • 2.2 T2、T3尝试执行时,发现条件不满足,因此调用await()进入等待队列,此刻队列是这样的:
    condition_2
    • 2.3 随着target递增为2,此时T1判断条件不满足,调用await()进入等待队列,同时signalAll()唤醒所有在等待队列中的T2、T3,,同时T2根据条件成功获取到锁,此刻队列时这样的:
    condition_7
    • 2.4 同时由于T3条件不满足,自身调用await()进入等待,同时唤醒所有等待的线程(这个过程比较快):
    condition_9
    • 2.5 随着target递增为23,此时T2判断条件不满足,调用await()进入等待队列,同时signalAll()唤醒所有在等待队列中的T3,,同时T3根据条件成功获取到锁,此刻队列时这样的:
    condition_3
    • 2.6 随着原子变量target的递增,反复上叙同步队列与等待队列的来回操作,就实现了线程间的等待与唤醒。

    以上就是Condition的实现机制,主要是借助Node维护一个单向队列,实现线程的等待与唤醒。这里一定要区分等待队列与同步队列的区别,等待队列时Condition维护的,主要是实现显式锁的释放与唤醒,而同步队列时AQS维护,主要是实现资源竞争的同步等待。

    其实后面介绍ReentrantLock的时候。就比较简单了,同时AQS、Condition、ReentrantLock在后面要讲的同步阻塞队列中使用的非常多,可以特别关注下。

    相关文章

      网友评论

          本文标题:2.Condition核心原理分析

          本文链接:https://www.haomeiwen.com/subject/cfeoyktx.html