美文网首页
Condition源码分析

Condition源码分析

作者: 竖起大拇指 | 来源:发表于2020-10-19 14:24 被阅读0次

我们先来看看Condtion类。

public interface Condition {

    //当前线程在接收到信号或被中断之前一直处于等待状态
    void await() throws InterruptedException;

  //当前线程在接到信号之前一直处于等待状态
   void awaitUninterruptibly();

  //当前线程在接收到信号,被中断或到达指定等待时间之前一直处于等待状态
   long awaitNanos(long nanosTimeout) throws InterruptedException;

  //当前线程在接收到信号,被中断或到达指定等待时间之前一直处于等待状态
   boolean await(long time, TimeUnit unit) throws InterruptedException;

    //当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态
   boolean awaitUntil(Date deadline) throws InterruptedException;

    //唤醒一个等待线程
    void signal();

    //唤醒所有等待线程
    void signalAll();

}

我们先来看个例子。

class ConditionDemo {

    private static Lock mLock=new ReentrantLock();
    private static Condition condition=mLock.newCondition();


    public static void main(String[] args) {

        Thread threadWait=new Thread(() -> {
            mLock.lock();

            try{
                System.out.println(Thread.currentThread().getName()+"正在运行");
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName()+"停止运行");
                condition.await();
            }catch (Exception ex){
                ex.printStackTrace();
            }

            System.out.println(Thread.currentThread().getName()+"获得一个Signal信号继续执行");

            mLock.unlock();

        },"WaitThread");

        threadWait.start();


        try {
            Thread.sleep(1000); //保证线程threadWait先执行
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


        Thread threadSignal=new Thread(() -> {
            mLock.lock();
            try {
                System.out.println(Thread.currentThread().getName()+"正在运行");
                condition.signal();//发送信息 唤醒其他线程
                System.out.println(Thread.currentThread().getName()+"发送一个signal");
                System.out.println(Thread.currentThread().getName()+"发送一个signal后,结束");
            }catch (Exception e){
                e.printStackTrace();
            }
            mLock.unlock();
        },"SignalThread");

        threadSignal.start();

    }
}

运行结果:

WaitThread正在运行
WaitThread停止运行
SignalThread正在运行
SignalThread发送一个signal
SignalThread发送一个signal后,结束
WaitThread获得一个Signal信号继续执行

我们来分析下:

  • 当WaitThread拿到锁之后,开始执行,当调用condition.await()方法之后,WaitThread开始睡眠并释放锁
  • WaitThread开始睡眠并释放锁之后,SignalThread拿到锁,拿到锁之后开始运行,并调用condition.signal()发射一个信号来唤醒正在等待此条件condition的线程。发射信号之后 SignalThread会继续执行,执行完毕后SignalThread释放锁。
  • 当SignalThread释放锁之后,WaitThread拿到锁开始继续运行直到结束

从上面的分析可以得知:Condition是一个多线程协调通信的一个工具类。使得某个或者某些线程一起等待某个条件,只有当该条件具备(signal或者SignalAll方法被调用)时,这些等待线程才会被唤醒,从而重新争夺锁。

看了上面的例子,你可能会又这样的疑问:当WaitThread拿到锁之后开始工作,然后调用condition.await()方法开始睡眠等待信号的到达。但是没有看见此线程释放锁啊,当SignalThread发出signal信号且释放锁之后也没有看见它重新获取锁啊??

下面我们来分析下:
我们都知道,ReetrantLock是独占锁,一个线程拿到锁之后如果不释放,那么另外一个线程肯定是拿不到锁的,所以在lock.lock()和lock.unlock()之间可能又一次释放锁的操作(同样也必然会有一个获取锁的操作)。我们回头看下代码,WaitThread在进入lock.lock()后唯一可能释放锁的操作就是await()了。也就是说await()操作实际上就是释放锁,然后挂起线程,一旦条件满足就被唤醒,再次获取锁。
下面我们从源码来分析下:
ReentrantLock类中的newCondtion方法的代码如下:

 public Condition newCondition() {
        return sync.newCondition();
    }

此方法直接调用了AQS的实现类Sync中的newCondition()方法.
Sync类中的newCondition()方法的代码如下:

 final ConditionObject newCondition() {
            return new ConditionObject();
        }

直接new了一个ConditionObject类的对象。ConditionObject类是Conditon的实现类,ConditionObject是AQS同步器中的一个内部类。
因此,在前面的例子中当调用condition.await方法时,就是调用的ConditionObject类中的await()方法。
下面我们就开始分析这个await方法的内部实现。

 public final void await() throws InterruptedException {
            if (Thread.interrupted())//判断当前线程时否被中断
                throw new InterruptedException();
            //将当前线程作为内容构造的节点node放入到条件队列中并返回此节点
            Node node = addConditionWaiter();
          //释放当前线程所拥有的锁,返回值为AQS的状态位
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //检测此节点是否在同步队列上,如果不在,说明此线程还没有资格竞争锁,此线程就继续挂起睡觉。
//直到检测到此节点在同步队列上(在有线程发出signal信号的时候)

            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                //并检测此线程有没有被中断
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //此线程尝试获取锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //清理条件队列中不是在等待条件的节点
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            //报告异常
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }


 /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
          //CONDITION 值为-2 表示当前节点在等待condition,
          //也就是 在condition队列中,如果此节点的状态不是CONDITION,
            //则需要将此节点在条件队列中移除
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter; //获取最后一个在等待的节点
            }

            //将此线程作为内容构造一个节点加入到条件队列末尾
            Node node = new Node(Node.CONDITION);

            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

         final int fullyRelease(Node node) {
        try {
            int savedState = getState();
            if (release(savedState))//释放锁
                return savedState;
            throw new IllegalMonitorStateException();
        } catch (Throwable t) {
            node.waitStatus = Node.CANCELLED;
            throw t;
        }
    }

        public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

         protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }


  protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }



 /**
     * Returns true if a node, always one that was initially placed on
     * a condition queue, is now waiting to reacquire on sync queue.
     * @param node the node
     * @return true if is reacquiring
        如果一个节点刚开始在条件队列上,现在在同步队列上获取锁则返回true
     */
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }


  /**
    将waitStatus不是CONDITION的节点全部删除
      **/
 private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }

await方法大概意思就是:首先将此代表该当前线程的节点加入到条件队列中去,然后释放该线程所有的锁并开始睡眠,最后不停的检测AQS队列中是否出现了此线程节点,如果收到signal信号之后就会在AQS队列中检测到,检测到之后,说明此线程又参与了竞争锁。

回到上面的例子,锁被释放后,线程WaitThread开始睡眠,这个时候线程因为线程WaitThread沉睡时调用fullyRelease方法释放锁,接着会唤醒AQS队列中的头节点,所以线程SignalThread开始竞争锁,并获取到锁,然后开始工作,线程SignalThread调用signal方法,发送signal信号。

我们来看下signal方法

 /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            //检测当前线程是否为拥有锁的独占线程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //firstWait为condition自己维护的一个链表头节点
            //取出第一个节点后开始唤醒操作
            Node first = firstWaiter;
            if (first != null)
                doSignal(first); //开始唤醒
        }

说明下,其实Condition内部维护了等待队列的头节点和尾节点,该队列的作用时存放等待signal信号的线程,该线程被封装为Node节点后存放于此。
下面为ConditonObject类中维护等待队列的头节点和尾节点的声明。

 public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;

        ..............
}

这里又出现了一个条件队列,可能我们就有点晕了,了解AQS同步器的都知道,这个类中还维护着一个队列,AQS自己维护的队列时当前等待资源(这里的资源就是锁)的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行,直到队列为空。

而Condition自己也维护了一个队列,该队列的作用时维护一个等待signal信号的队列,两个队列的作用时不同的,事实上,每个线程也仅仅会同时存在以上个队列中的一个,流程时这样:
用上面的例子的两个线程来描述

1、首先,线程WaitThread调用lock.lock()时,由于此时锁并没有被其它线程占用,因此线程WaitThread直接获得锁并不会进入AQS同步队列中进行等待。

2、在线程WaitThread执行期间,线程SignalThread调用lock.lock()时由于锁已经被线程WaitThread占用,因此,线程SignalThread进入AQS同步队列中进行等待。

3、在线程WaitThread中执行condition.await()方法后,线程WaitThread释放锁并进入条件队列Condition中等待signal信号的到来。

4、线程SignalThread,因为线程WaitThread释放锁的关系,会唤醒AQS队列中的头结点,所以线程SignalThread会获取到锁。

5、线程SignalThread调用signal方法,这个时候Condition的等待队列中只有线程WaitThread一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候,线程WaitThread并没有被唤醒。只是加入到了AQS等待队列中去了

6、待线程SignalThread执行完成之后并调用lock.unlock()释放锁之后,会唤醒此时在AQS队列中的头结点.所以线程WaitThread开始争夺锁(由于此时只有线程WaitThread在AQS队列中,因此没人与其争夺),如果获得锁继续执行。

直到线程WaitThread释放锁整个过程执行完毕。
可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。

然后我们继续看doSignal方法

 /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                  //修改头节点,完成旧头节点的移出工作
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

doSignal(Node first)方法干了两件事:第一件事为修改条件队列中的头节点,第二件事为完成旧的头节点的移出工作,即从condition队列中移出到AQS同步队列中去。
节点的移出工作是调用transferForSignal(Node node)来完成的。

 /**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
      从条件队列中转移一个节点到同步队列中去
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
            如果不能改变waitStatus的值 则说明此节点已经被取消了
         */
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);//将节点加入到同步队列中去
        int ws = p.waitStatus;
          //如果节点p的状态为cancel 或者修改waitStatus失败 则直接唤醒
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

可以看到,正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为true的,所以,不会在这个时候唤醒该线程。

只有到发送signal信号的线程调用reentrantLock.unlock()后,因为它已经被加到AQS的等待队列中,所以才可能会被唤醒。

以上就是关于Condition的相关知识。

相关文章

  • java并发-Condition接口

    Condition的简单使用 使用Condition实现的有界队列 核心方法 await()方法源码分析 sign...

  • Condition源码分析

    Java对象都有一组监视器方法:wait,notify,notifyAll,而synchronized本身就是利用...

  • Condition源码分析

    并发源码分析篇: ReentrantLock源码分析 ReentrantReadWriteLock源码分析 Con...

  • Condition源码分析

    我们先来看看Condtion类。 我们先来看个例子。 运行结果: 我们来分析下: 当WaitThread拿到锁之后...

  • J.U.C:Condition

    Condition源码分析   调用 Condition,需要获得 Lock 锁,所以意味着会存在一个 AQS 同...

  • Java 源码分析-Condition

      前面对Java中的锁进行了简单的分析,锁的使用和原理整体来说还是比较简单。今天我们来分析一下Condition...

  • ReentrantLock condition 源码分析

    本篇主要介绍ReentrantLock 中 condition的await/signal方法的实现原理。 想忽略整...

  • 深入解析AbstractQueuedSynchronizer源码

    前面分析了AbstractQueuedSynchronizer实现的其他两部分:Condition源码解析独占模式...

  • JUC之Condition源码分析

    原文出处:https://www.zzwzdx.cn Condition接口定义了类似Object的监视器方法,它...

  • 并发编程之 Condition 源码分析

    前言 Condition 是 Lock 的伴侣,至于如何使用,我们之前也写了一些文章来说,例如 使用 Reen...

网友评论

      本文标题:Condition源码分析

      本文链接:https://www.haomeiwen.com/subject/zylkmktx.html