美文网首页
Condition接口

Condition接口

作者: 菠萝丶丶 | 来源:发表于2020-07-24 12:45 被阅读0次

    任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

    通过对比Object的监视器方法和Condition接口,可以更详细地了解Condition的特性,如下表所示:

    对比项 Object监视器方法 Condition
    前置条件 获取对象的锁 调用Lock.lock()获取锁
    调用Lock.newCondition()获取Condition对象
    调用方式 直接调用
    如:object.wait()
    直接调用
    如condition.await()
    等待队列个数 一个 多个
    当前线程释放锁并进入等待状态 支持 支持
    当前线程释放锁并进入等待状态,
    在等待状态中不响应中断
    不支持 支持
    当前线程释放锁并进入超时等待状态 支持 支持
    当前线程释放锁并进入等待状态到将来的某个时间 不支持 支持
    唤醒等待队列中的一个线程 支持 支持
    唤醒等待队列中的全部线程 支持 支持

    Condition接口与示例

    Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的。

    下面我们来看一则代码示例:

    public class BoundedQueue<T> {
        private Object[] items;
        // 添加的下标,删除的下标和数组当前数量
        private int addIndex, removeIndex, count;
        private Lock lock = new ReentrantLock();
        private Condition notEmpty = lock.newCondition();
        private Condition notFull = lock.newCondition();
    
        public BoundedQueue(int size) {
            items = new Object[size];
        }
    
        // 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
        public void add(T t) throws InterruptedException {
            lock.lock();
            try {
                while (count == items.length) {
                    notFull.await();
                }
                items[addIndex] = t;
                if (++addIndex == items.length) {
                    addIndex = 0;
                }
                ++count;
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
    
        // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
        @SuppressWarnings("unchecked")
        public T remove() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0) {
                    notEmpty.await();
                }
                Object x = items[removeIndex];
                if (++removeIndex == items.length) {
                    removeIndex = 0;
                }
                --count;
                notFull.signal();
                return (T) x;
            } finally {
                lock.unlock();
            }
        }
    }
    

    上述代码中,BoundedQueue通过add(T t)方法添加一个元素,通过remove()方法移出一个元素。以添加方法为例。

    首先需要获得锁,目的是确保数组修改的可见性和排他性。当数组数量等于数组长度时,表示数组已满,则调用notFull.await(),当前线程随之释放锁并进入等待状态。如果数组数量不等于数组长度,表示数组未满,则添加元素到数组中,同时通知等待在notEmpty上的线程,数组中已经有新元素可以获取。

    在添加和删除方法中使用while循环而非if判断,目的是防止过早或意外的通知,只有条件符合才能够退出循环。回想之前提到的等待/通知的经典范式,二者是非常类似的。

    下面我们来一段代码对BoundedQueue进行测试:

        public static void main(String[] args) throws InterruptedException {
            BoundedQueue<Integer> boundedQueue = new BoundedQueue<>(2);
            // 步骤1
            for (int i = 0; i < 5; i++) {
                final int l = i;
                new Thread(() -> {
                    try {
                        Thread.sleep((l + 1) * 1000);
                        boundedQueue.add(l);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
            // 步骤2
            for (int i = 0; i < 5; i++) {
                new Thread(() -> {
                    try {
                        Integer remove = boundedQueue.remove();
                        System.out.println(remove + " get at: " + System.currentTimeMillis());
                    } catch (InterruptedException e) {
                    }
                }).start();
            }
            Thread.sleep(10000);
            System.out.println("-----------------------------------");
            // 步骤3
            for (int i = 0; i < 5; i++) {
                final int l = i;
                new Thread(() -> {
                    try {
                        boundedQueue.add(l);
                        System.out.println(l + " insert at: " + System.currentTimeMillis());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
            // 步骤4
            for (int i = 0; i < 5; i++) {
                final int l = i;
                new Thread(() -> {
                    try {
                        Thread.sleep((l + 1) * 1000);
                        boundedQueue.remove();
                    } catch (InterruptedException e) {
                    }
                }).start();
            }
        }
    

    输出结果如下:

    0 get at: 1595563736709
    1 get at: 1595563737709
    2 get at: 1595563738709
    3 get at: 1595563739709
    4 get at: 1595563740709
    -----------------------------------
    0 insert at: 1595563745712
    2 insert at: 1595563745712
    3 insert at: 1595563746713
    4 insert at: 1595563747713
    1 insert at: 1595563748713
    

    测试代码中先创建了5个线程每隔1秒对boundedQueue进行添加,再创建了5个线程直接对boundedQueue取元素并且移除,从结果可知步骤2中的get由于没有足够的元素,所以在get时进入了阻塞,直到下一秒步骤1中的线程醒来然后进行了add并唤醒,步骤2中阻塞的线程同时醒来。这里测试了notEmpty.await()功能。
    线程休眠10秒之后步骤3重新启动5个线程进行add,此时由于boundedQueue空间为2,前两个线程可以正常add,而后add的线程由于空间不足阻塞在notFull上,直到下一秒步骤4中的线程remove掉一个元素,此时唤醒一个等待在notFull上的线程进行add,直到所有步骤3中的线程执行结束。

    Condition的实现分析

    ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键,我们将主要分析Condition的等待队列、等待和通知。

    等待队列
    等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。

    一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,基本结构如下图所示:


    等待队列的基本结构

    如图所示,Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。

    在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列,其关系如下所示:


    同步队列与等待队列

    如图所示,Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。

    等待
    调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。
    如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。下面是ConditionObject的await方法:

    public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        // 当前线程加入等待队列
        Node node = addConditionWaiter();
        // 释放同步状态,也就是释放锁
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    

    调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。

    当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。

    如果从队列的角度去看,当前线程加入Condition的等待队列,该过程如下图所示:


    当前线程加入等待队列

    如图所示,同步队列的首节点并不会直接加入等待队列,而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入等待队列中。

    通知
    调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中,下面是Condition的signal()方法。

    public final void signal() {
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);
    }
    

    调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。
    节点从等待队列移动到同步队列的过程如下图所示:


    节点从等待队列移动到同步队列

    通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列。当节点移动到同步队列后,当前线程再使用LockSupport唤醒该节点的线程。
    被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。

    成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。

    Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。

    本文摘自《Java并发编程的艺术》

    相关文章

      网友评论

          本文标题:Condition接口

          本文链接:https://www.haomeiwen.com/subject/bfxjlktx.html