美文网首页
ReentrantLock源码剖析三(Condition)

ReentrantLock源码剖析三(Condition)

作者: 袁小象 | 来源:发表于2017-08-02 21:33 被阅读0次

一、简介

       JUC中的ReentrantLock给我们提供了方便的加锁解锁操作,但是我们有时候会需要有条件的对线程进行挂起和唤醒,此时另一个工具就排上了用场。下面是Condition变量的常用用法。

public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        Thread thread1 = new Thread(() -> {
            try {
                lock.lock();
                System.out.println("我要等一个新信号");
                condition.await();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("拿到一个信号!!");
            lock.unlock();
        }, "waitThread");

        Thread thread2 = new Thread(() -> {
            lock.lock();
            System.out.println("我拿到锁了");
            try {
                Thread.sleep(3000);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            condition.signal();
            System.out.println("我发了一个信号!!");
            lock.unlock();
        }, "signalThread");

        thread1.start();
        thread2.start();
    }

运行完之后的结果:

我要等一个新信号
我拿到锁了
我发了一个信号!!
拿到一个信号!!

       线程1获取到锁之后调用Condition的await()方法,该方法会释放锁,并将当前线程挂起。随后线程2会拿到锁,并执行signal()方法,该方法会唤起线程1,并释放锁,然后线程1拿到锁,执行后续的流程。
       所以说Condition是一个多线程间协调通信的工具,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备( signal 或者 signalAll方法被调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁。
       那么,这些逻辑具体是怎么实现的呢?
       首先,必须要明确,在线程调用await()或者signal()/signalAll()方法时,必须首先获取锁,否则会出现java.lang.IllegalMonitorStateException异常。其次,Condition(其实是ConditionObject,Condition接口的实现)维护了一个所有等待Condition条件变量的线程的队列,每个线程构成一个Node结点,也就是AQS中的Node结点:

/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;

二、await()

       以Condition condition = lock.newCondition();为例,newCondition()会调用Sync的newCondition()方法:

public Condition newCondition() {
        return sync.newCondition();
}

Sync的newCondition()方法:

final ConditionObject newCondition() {
            return new ConditionObject();
}
//ConditionObject是AQS中的内部类,实例属性只有上面的firstWaiter和lastWaiter
public ConditionObject() { }

       对于await()方法,可以抛出中断异常,

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //将当前线程封装成Node结点,添加到Condition的等待队列中
            Node node = addConditionWaiter();  
            int savedState = fullyRelease(node);   //释放所占的锁,因为在调用await()方法时,是占有锁的
            int interruptMode = 0;
            //释放了锁之后,判断当前线程的node结点是不是在syncQueue中,
            //什么是syncQueue呢?就是获取锁不成功而被挂起的线程所在的那个队列
            //如果不在syncQueue中,说明当前线程还不具备获取锁的资格,就将当前线程挂起,直到被添加到阻塞队列中,
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);  //线程被挂起,等待被signal唤醒,此时可以直接跳到下面的signal方法解析
                //挂起的过程中,如果被中断了,线程被唤醒,跳出while循环
                //如果没被中断,则此时node已经在阻塞队列中了,也会跳出循环
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //此时线程已经被唤醒,node结点已经被添加到阻塞队列中准备获取锁,被谁添加到阻塞队列了呢,是signal
            //acquireQueued尝试获取锁,被中断返回true,否则返回false
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
}

       其他版本的await()方法和这个类似,读者可以自行尝试分析。

三.signal()

//ConditionObject
public final void signal() {
            //如果在没有获取锁的情况下调用signal,会抛出IllegalMonitorStateException异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;      //拿到队列中的第一个node,此队列是Condition队列
            if (first != null)
                doSignal(first);
}

private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;   //将第一个队列移出队列
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
}

//该方法是将结点移到阻塞队列中,使得当前node节点的线程可以有资格获取锁
final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        //在加入阻塞队列之前,将node的waitStatus设置为0,如果失败,说明该节点已经被取消,
        //返回false,此时上面的doSignal方法会继续遍历Condition队列,
        //找到第一个还在等待Condition变量的结点
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        //将结点加入到阻塞队列,返回的p结点是node的前置节点
        Node p = enq(node);
        int ws = p.waitStatus;
        //如果前一个节点已经被取消等待(ws>0),或者修改waitStatus失败,则直接唤醒。
        //正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)这个判断
        //是不会为true的,所以,不会在这个时候唤醒该线程。
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;   //成功转移到阻塞队列,返回true
}

       那么什么时候才会唤醒呢?当前线程调用完 signal()之后,执行lock.unlock()方法之后,释放锁的时候,会唤醒其他线程,这个就是unlock()的逻辑了。
       signalAll()方法的逻辑与signal()类似,读者可以自己分析一遍。

四.总结

       现在我们再来理一遍:
       一个线程Alock.lock()获取锁成功之后,调用condition.await()方法,该方法会首先将线程封装成node加入Condition队列,然后释放锁,将线程移出阻塞队列,然后挂起;另一个线程B因为之前线程释放锁,获取锁成功,调用signal()方法,将Condition队列的第一个node转移到阻塞队列,这个时候还没完,执行完signal之后会释放锁,此时会唤醒后面的线程,此时线程A有机会竞争到锁。

相关文章

网友评论

      本文标题:ReentrantLock源码剖析三(Condition)

      本文链接:https://www.haomeiwen.com/subject/jgwylxtx.html