美文网首页JVM · Java虚拟机原理 · JVM上语言·框架· 生态系统
Java 多线程基础——使用Condition对象实现线程通信

Java 多线程基础——使用Condition对象实现线程通信

作者: 爱打乒乓的程序员 | 来源:发表于2020-04-05 00:59 被阅读0次

1.背景

在学习Condition接口之前,我认为十分有必要再回顾一下Java线程状态。
有的人说Java线程状态只有5种(NEW,RUNNABLE,RUNNING,BLOCKED,DEAD),也有一些人说有6种(NEW,RUNNABLE,BLOCKED,WAITING,TIMED_WAITING,TERMINATED),那究竟是谁对谁错呢?


实际上Java线程的状态有6种。但为什么有的人认为只有5种呢?
5种状态是指的是进程状态,很多人把进程状态与线程状态混淆,导致很多地方说法不一(参考文章:https://www.zhihu.com/question/56494969

我们还可以从另外一个角度说说为什么Java线程有6种状态,答案就在Thread类的源码里面!

    public enum State {
        NEW,
        RUNNABLE,
        BLOCKED,
        WAITING,
        TIMED_WAITING,
        TERMINATED;
    }

代码里面都明确说明了有6种状态,所以下次面试官问你Java线程有多少种状态,别再回答有5种了!
线程转变为WAITING或TIMED_WAITING可以通过Object类的wait()、 wait(long timeout)方法,那为什么还会存在同样可以使线程等待的Condition接口呢?那当然是因为Condition接口的实现类更加强大,更加实用啦!

图片引用自《Java并发编程的艺术》

有趣的是,J.U.C包下的锁、并发类、阻塞队列、线程池都在使用Condition接口中的方法,而不是使用原始Object类的监视器方法,也足以证明,Condition接口更加强大,能够更加精细的控制多线程的休眠与唤醒!

因此,我写此篇文章,希望可以加深自己对多线程编程的理解,也希望可以为你带来一些收获~

2.简介

Condition接口是在JDK1.5之后才出现(作者还是那个熟悉的名字——Doug Lea)。Condition可以理解为是一个多线程间协调通信的接口,Condition是依赖Lock对象,换句话说,可以通过Lock.newCondition()方法获取Condition对象。

废话不多说,先来看看Condition接口有哪些方法:

    // 当前线程进入等待状态直到被通知(signal)或中断
    void await() throws InterruptedException;

    // 当前线程进入等待状态直到被通知,该方法不响应中断
    void awaitUninterruptibly();

    // 当前线程进入等待状态直到被通知、中断或者超时,返回值表示剩余超时时间
    long awaitNanos(long nanosTimeout) throws InterruptedException;

    // 当前线程进入等待状态直到被通知、中断或者到指定时间,如果未超时返回true,否则返回false 
    boolean await(long time, TimeUnit unit) throws InterruptedException;

    // 当前线程进入等待状态直到被通知、中断或者到某个时间。如果未超时返回true,否则返回false 
    boolean awaitUntil(Date deadline) throws InterruptedException;

    // 唤醒一个等待在Condition上的线程,该线程从等待方法返回前必须获得与Condition相关联的锁
    void signal();

    // 唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得与Condition相关联的锁
    void signalAll();

实现Condition接口是ConditionObject类,其作为内部类,分别在java.util.concurrent.locks.AbstractQueuedSynchronizer和java.util.concurrent.locks.AbstractQueuedLongSynchronizer中声明。

深入源码之前需要了解的概念

  1. 当一个或多个线程调用同一个Condition对象的await方法的时候,就会将每个线程都封装成一个Node节点,然后将一个或多个线程的Node节点形成一个FIFO的队列,名为条件队列(Condition Queue)
  2. 当调用signal或signalAll方法的时候,就会将线程节点从条件队列移除,然后放进锁对象的同步队列

针对第2点,举个栗子:
如果Condition对象是由ReentrantLock对象生成的,则会调用ReentrantLock定义的公平锁或非公平锁的tryAcquire方法。(如果不熟悉ReentrantLock的朋友,可以参考我写的另外一篇拙作:多线程之ReentrantLock源码剖析)

3.源码剖析

以下的源码剖析是以AbstractQueuedSynchronizer的ConditionObject内部类为例,而AbstractQueuedLongSynchronizer实现Condition接口的内部类大同小异,就不再细看,有兴趣的读者可以私底下比较一下之间的异同。

本篇文章着重分析await()和signal()方法的源码。其它相似方法的源码都是基本一样,阻塞的方法如果有时间作为参数,当线程阻塞超时或在规定时间内被唤醒就会针对不同情况返回不同的结果;唤醒线程的方法更加简单,如果是signal()则唤醒一个线程节点后就退出唤醒线程的操作,而执行的是signalAll()方法则会循环条件队列,将遍历出来的线程节点都唤醒,直到条件队列为空。

3.1 将线程封装为Node节点

    static final class Node {
        // 节点的状态:SIGNAL、CANCELLED、CONDITION、PROPAGATE
        volatile int waitStatus;

        // 前节点
        volatile Node prev;

        // 后节点
        volatile Node next;

        // 被包装为Node节点的线程
        volatile Thread thread;

        // 指向条件队列下一个节点
        Node nextWaiter;

        Node() {
        }

        // addWaiter方法调用
        Node(Thread thread, Node mode) {
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) {
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
        
        // 为省篇幅,省略部分代码
    }

3.2 实现Condition接口的内部类——ConditionObject

    public class ConditionObject implements Condition, java.io.Serializable {
        // 条件队列的头节点
        private transient Node firstWaiter;
        // 条件队列的尾节点
        private transient Node lastWaiter;

        public ConditionObject() {
        }
        
       // 为省篇幅,省略部分代码
    }

3.3 await方法

以下罗列出了调用await方法及其相关方法的源码,虽然很长,但别慌,没想象中的那么难!
无非就是以下几点核心流程:

  1. 当前线程封装成节点添加到条件队列
  2. 当前线程释放占有的锁(释放锁的目的是为了让其它线程抢占,因为不能占着茅坑不拉屎嘛,如果当前线程一直阻塞,就会一直占着锁不让其它线程持有)
  3. 线程节点node是否进入AQS同步队列,如果是跳到第5步,否则执行第4步
  4. 调用LockSupport.park方法挂起线程,直到被唤醒后(线程中断或其它线程执行唤醒操作),重新执行第3步
  5. 进入AQS同步队列后会重新竞争锁


    // 如果当前线程被中断会抛出InterruptedException异常
    public final void await() throws InterruptedException {
        // 如果线程被中断,抛出中断异常
        if (Thread.interrupted())
            throw new InterruptedException();
        //当前线程封装成节点添加到条件队列
        Node node = addConditionWaiter();
        //释放当前线程占有锁
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        //当前线程挂起,直到被唤醒后进入同步队列
        while (!isOnSyncQueue(node)) {
            //调用park方法挂起线程
            LockSupport.park(this);
            //如果是因为中断被唤醒,break退出循环
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 线程节点进入同步队列后,执行acquireQueued方法将节点加入AQS的等待队列并由ReentrantLock的公平锁或非公平锁获取线程后重新竞争锁
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) 
            // 从条件队列中解除cencelled节点
            unlinkCancelledWaiters();
        //如果是因为中断而被唤醒
        if (interruptMode != 0)
            //抛出中断异常
            reportInterruptAfterWait(interruptMode);
    }
    //释放当前线程占有锁
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {//成功释放同步状态
                failed = false;
                return savedState;
            } else {//不成功释放同步状态抛出异常
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }    
    // 判断节点是否在同步队列中
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }    
    
    // 将当前线程包装成节点,添加到条件队列
    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // If lastWaiter is cancelled, clean out.
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //将当前线程封装成节点
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        // 如果条件队列为空,当前线程节点成为头节点,否则成为新的尾节点
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    // 从条件队列中解除cencelled节点
    private void unlinkCancelledWaiters() {
        Node t = firstWaiter;
        Node trail = null;
        while (t != null) {
            Node next = t.nextWaiter;
            if (t.waitStatus != Node.CONDITION) {
                t.nextWaiter = null;
                if (trail == null)
                    firstWaiter = next;
                else
                    trail.nextWaiter = next;
                if (next == null)
                    lastWaiter = trail;
            } else
                trail = t;
            t = next;
        }
    }

3.4 signal方法

唤醒线程的操作没有阻塞线程的操作那么复杂,只不过就是将Condition中的条件队列头节点删去并加入到AQS同步队列中。唤醒后的线程什么时候可以被执行还需要看CPU的调度,这并不是signal方法能够处理的!所以一定要清晰的知道,调用signal或signalAll方法并不是直接让线程重新获取锁资源,而是将原先Condition中条件队列的节点移到AQS的同步队列,然后通过锁的实现,将同步队列中的线程重新持有锁。

    public final void signal() {
        // 当前线程是否是获取了锁的线程,如果不是抛出IllegalMonitorStateException
        // isHeldExclusively是在ReentrantLock内实现的
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        // 如果条件队列不为空则调用doSignal方法,因此doSignal是核心
        if (first != null)
            doSignal(first);
    }
    // 将条件队列的头节点从条件队列转移到同步队列,并且,将该节点从条件队列删除
    private void doSignal(Node first) {
        do {
            // 如果没有继续节点,证明条件队列就只有当前一个节点,唤醒节点后,条件队列为空
            if ((firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            // 设置节点的后继节点为null,便于GC
            first.nextWaiter = null;
        } while (!transferForSignal(first) && (first = firstWaiter) != null);
    }
    final boolean transferForSignal(Node node) {
        //修改节点的状态,如果修改不成功,说明该节点指向CANCEL,直接返回false
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //将节点放入同步队列,并返回它的前驱节点
        Node p = enq(node);
        int ws = p.waitStatus;
        // 如果前驱节点的状态为CANCEL或者修改waitStatus失败,则直接唤醒当前线程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    // 自旋将节点放入同步队列
    private Node enq(final Node node) {
        for (; ; ) {
            Node t = tail;
            // 如果同步队列为空则初始化
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

4.总结

以上通过源码剖析await和signal方法。至于Condition接口其它方法的实现,其实都是大同小异,也不必再花费太多篇幅描述。
最后总结以下几点内容作为本篇文章的结束:
1.Condition接口的实现类是ConditionObject类,其对象的创建依赖锁;
2.Condition对象内部维护着一个条件队列,当线程调用await方法的时候会先在条件队列中入队;被唤醒的时候条件队列头节点出队,然后入队AQS的同步队列,最后根据锁的实现将同步队列的线程重新持有锁;
3.执行await方法操作的时候,会将当前线程占有的锁释放(不能占着茅坑不拉屎);
4.await方法通过LockSupport.park()挂起线程;signal方法通过LockSupport.unpark()唤醒线程

参考资料:
《Java并发编程的艺术》
多线程之ReentrantLock源码剖析
java线程运行怎么有第六种状态?

相关文章

网友评论

    本文标题:Java 多线程基础——使用Condition对象实现线程通信

    本文链接:https://www.haomeiwen.com/subject/wppbyhtx.html