Condition

作者: 得力小泡泡 | 来源:发表于2021-01-09 19:14 被阅读0次

    “传统上,我们可以通过synchronized关键字 + wait + notify/notifyAll 来实现多个线程之间的协调与通信,整个过程都是由JVM来帮助我们实现的【native方法嘛】;开发者无需(也是无法)了解底层的实现细节。

    从JDK 5开始,并发包提供了Lock,Condition(里面有await()与signal()/signalAll())来实现多个线程之间的协调与通信,整个过程都是由开发者来控制的,而且相比于传统方式,更加灵活,功能也更加强大。

    Thread.sleep与Condition.await(或者Object.wait()方法)的本质区别:sleep方法本质上不会释放锁,而await会释放锁,并且在signal后,还需要重新获得锁才能继续执行(该行为与Object的wait方法完全一致)【关于Thread.sleep和Object.wait()的区别在之前学习过了,温故一下】。

    也就是Lock相当于原来的synchronized关键字,Condition.await()相当于原来Object.wait(),Condition.signal()或Condition.signalAll()相当于原来Object.notify()或Object.notifyAll(),而Lock和Condition是纯Java编写的,可以人为来控制

    1、官方文档

    Condition是将Object的监视器方法如wait,notify和notifyAll放置在不同的对象当中,达到的效果是每个对象会有多个等待集合(也就是说有了这个Condition之后比如有10个线程,可以让前2个线程进入同一个对象的第一个等待集合当中,让中间6个线程也进入同一个对象的第二个等待集合当中,让最后2个线程进入同一个对象的第三个等待集合当中。。貌似颠覆了对于Object.wait()的行为,我们知道对于Object.wait()只会存于当前对象的同一个等待集合中,也就是Condition要比传统的方式要灵活很多,这也是一个非常大的改进)通过任意Lock实现将其进行合并。Lock替换synchronized方法和语句的使用, Condition取代了对象监视器方法的使用

    Condition(也叫等待队列)针对一个线程提供了挂起执行(等待状态)的一种方式,直到被某些状态条件变为true的线程所通知。由于对于共享状态信息的访问是发生在不同的线程里,因为它必须要受到保护,所以它必须被保护,因此某种形式的锁与该条件相关联。等待条件的关键属性是它会自动的释放掉关联的lock,并且会挂起当前线程,就像Object.wait一样

    一个Condition实例会被天然地绑定在一个lock上面。想要获取一个特定的Lock实例所对应的Condition实例的话,需要使用Lock.nexCondition()方法

    举个例子(移动到下面了)

    注意:Condition实例仅仅是一个普通的对象,并且自身也能用作synchronized语句块的目标,而且还拥有自己的监视器wait() 和 notification方法被调用。在获取Condition实例本身的await()和wait()以及notify()和signal()方法对应的锁并没有任何的关系,需要避免混淆。

    实施注意事项

    当等待在一个Condition时,一个“假唤醒”(Object.wait()的情况类似,就是除了调用了notify()之后,还有一种可能就是wait()的线程由系统来唤醒了,但是此时我们并没有调用notify()方法,Condition也有假唤醒的情况。)是被允许发生的,一般来说,作为对底层平台语义的让步。 这对大多数应用程序几乎没有实际的影响,因为Condition应该始终在循环中等待,测试正在等待的状态谓词。 一个实现可以免除虚假唤醒的可能性,但建议应用程序员总是假定它们可以发生,因此总是等待循环。

    2、具体方法

    1、void await()

    它导致当前线程等待直到它被其他线程给通知唤醒了或者被中断了,与这个Condition关联的锁会自动地释放掉,并且当前线程会进入休眠状态无法进行调度直到以下4件事情发生之一
    1、当某个其他线程调动了这个Condition.signal()方法,然后碰巧当前线程成为被唤醒的线程
    2、其他的线程调用了Condition.signalAll()方法
    3、其他线程中断了当前线程,并且线程的挂起是中断是被支持的
    4、发生“假唤醒”

    2、void awaitUninterruptibly()

    (比void await()少了可以被中断的事情发生)
    它导致当前线程等待直到它被其他线程给通知唤醒了或者被中断了,与这个Condition关联的锁会自动地释放掉,并且当前线程会进入休眠状态无法进行调度直到以下3件事情发生之一
    1、当某个其他线程调动了这个Condition.signal()方法,然后碰巧当前线程成为被唤醒的线程
    2、其他的线程调用了Condition.signalAll()方法
    3、发生“假唤醒”

    3、long awaitNanos(long nanosTimeout)

    (比void await()多了等待时间过去了)
    它导致当前线程等待直到它被其他线程给通知唤醒了或者被中断了,与这个Condition关联的锁会自动地释放掉,并且当前线程会进入休眠状态无法进行调度直到以下5件事情发生之一
    1、当某个其他线程调动了这个Condition.signal()方法,然后碰巧当前线程成为被唤醒的线程
    2、其他的线程调用了Condition.signalAll()方法
    3、其他线程中断了当前线程,并且线程的挂起是中断是被支持的
    4、发生“假唤醒”
    5、等待的时间过了

    返回的是等待剩余的时间,例如指定500ms,等待了200ms,就会返回300ms

    4、boolean await(long time, TimeUnit unit)

    与3一样

    5、boolean awaitUntil(Date deadline)

    (比void await()多了规定的期限过去了)
    它导致当前线程等待直到它被其他线程给通知唤醒了或者被中断了,与这个Condition关联的锁会自动地释放掉,并且当前线程会进入休眠状态无法进行调度直到以下5件事情发生之一
    1、当某个其他线程调动了这个Condition.signal()方法,然后碰巧当前线程成为被唤醒的线程
    2、其他的线程调用了Condition.signalAll()方法
    3、其他线程中断了当前线程,并且线程的挂起是中断是被支持的
    4、发生“假唤醒”
    5、规定的期限过去了

    6、void signal()

    唤醒其中一个等待的线程。如果有多个线程等在在这个条件上则其中的一个线程会被选择唤醒,那么那个线程必须在await返回之前重新获取这个锁(即拿了锁,再跳出await执行其他)

    7、void signalAll()

    唤醒所有等待的线程,每一个程必须在await返回之前重新获取这个锁(即拿了锁,再跳出await执行其他)

    3、经典例子

    1、Condition写法(两个等待集合)

    假设我们有一个有限的缓冲区,它支持puttake方法。如果一个take尝试着从一个空的缓冲区中取元素,该线程则会进行阻塞直到里面生产出来新的元素。如果一个put尝试往一个满的缓冲区放,则线程也会进行阻塞直到有一个空闲空间为止。我们将让保持等待状态的put线程和take线程放在不同的wait等待集合(可以使用两个Condition实例来实现等待集合)当中以便于我们能够使用一种优化:在缓存中的项目或空间可用的时候,可以有选择性地进行线程通知,在调度线程上更加灵活(即需要唤醒put线程时,直接在对应的Condition等待集合进行通知,而不需要把put线程和take线程全部都通知)

    package com.concurrency2;
    
    import java.util.Arrays;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    import java.util.stream.IntStream;
    
    public class MyTest2 {
        public static void main(String[] args) {
            BoundedContainer boundedContainer = new BoundedContainer();
    
            IntStream.range(0, 10).forEach(i -> new Thread(() -> {
                try {
                    boundedContainer.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start());
    
            IntStream.range(0, 10).forEach(i -> new Thread(() -> {
                try {
                    boundedContainer.put("hello");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start());
        }
    }
    
    class BoundedContainer {
    
        private String[] elements = new String[10];
    
        private Lock lock = new ReentrantLock();
    
        private Condition notEmptyCondition = lock.newCondition();
    
        private Condition notFullCondition = lock.newCondition();
    
        private int elementCount; //elements数组中已有的元素数量
    
        private int putIndex;
    
        private int takeIndex;
    
        public void put(String element) throws InterruptedException {
            this.lock.lock();
    
            try {
                while(this.elementCount == this.elements.length) {
                    notFullCondition.await();
                }
    
                elements[putIndex] = element;
    
                if(++putIndex == this.elements.length) {
                    putIndex = 0;
                }
    
                ++elementCount;
    
                System.out.println("put method:" + Arrays.toString(elements));
    
                notEmptyCondition.signal();
            } finally {
                this.lock.unlock();
            }
        }
    
        public String take() throws InterruptedException {
            this.lock.lock();
    
            try {
                while(elementCount == 0) {
                    notEmptyCondition.await();
                }
    
                String element = elements[takeIndex];
    
                elements[takeIndex] = null;
    
                if(++takeIndex == this.elements.length) {
                    takeIndex = 0;
                }
    
                --elementCount;
    
                System.out.println("get method:" + Arrays.toString(elements));
    
                notFullCondition.signal();
    
                return element;
            } finally {
                this.lock.unlock();
            }
        }
    }
    

    输出

    put method:[hello, null, null, null, null, null, null, null, null, null]
    put method:[hello, hello, null, null, null, null, null, null, null, null]
    put method:[hello, hello, hello, null, null, null, null, null, null, null]
    get method:[null, hello, hello, null, null, null, null, null, null, null]
    get method:[null, null, hello, null, null, null, null, null, null, null]
    get method:[null, null, null, null, null, null, null, null, null, null]
    put method:[null, null, null, hello, null, null, null, null, null, null]
    get method:[null, null, null, null, null, null, null, null, null, null]
    put method:[null, null, null, null, hello, null, null, null, null, null]
    put method:[null, null, null, null, hello, hello, null, null, null, null]
    get method:[null, null, null, null, null, hello, null, null, null, null]
    get method:[null, null, null, null, null, null, null, null, null, null]
    put method:[null, null, null, null, null, null, hello, null, null, null]
    put method:[null, null, null, null, null, null, hello, hello, null, null]
    put method:[null, null, null, null, null, null, hello, hello, hello, null]
    get method:[null, null, null, null, null, null, null, hello, hello, null]
    get method:[null, null, null, null, null, null, null, null, hello, null]
    get method:[null, null, null, null, null, null, null, null, null, null]
    put method:[null, null, null, null, null, null, null, null, null, hello]
    get method:[null, null, null, null, null, null, null, null, null, null]
    
    2、传统写法(一个等待集合)
    package com.concurrency;
    
    public class MyObject {
        private int counter;
    
        public synchronized void increase() {
            while(counter != 0) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            counter ++;
    
            System.out.println(counter);
    
            notify();
        }
    
        public synchronized void decrease() {
            while(counter == 0) {
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            counter --;
    
            System.out.println(counter);
    
            notify();
        }
    }
    

    4、Condition的源码

    看完了AQS,ReentrantLock 和 ReentrantReadWriteLock后扩展出来的

    1、Condition的本质
    private Lock lock = new ReentrantLock();
    
    private Condition notEmptyCondition = lock.newCondition();
    
    private Condition notFullCondition = lock.newCondition();
    

    从代码中可以看到Condition是通过lock.newCondition()创建的,跟进源码

        public Condition newCondition() {
            return sync.newCondition();
        }
    
        final ConditionObject newCondition() {
            return new ConditionObject();
        }
    
    public class ConditionObject implements Condition, java.io.Serializable {
        /** First node of condition queue. */
        private transient AbstractQueuedSynchronizer.Node firstWaiter;
        /** Last node of condition queue. */
        private transient AbstractQueuedSynchronizer.Node lastWaiter;
    
        public ConditionObject() { }
    
        private void doSignal(AbstractQueuedSynchronizer.Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
        }
    
        private void doSignalAll(AbstractQueuedSynchronizer.Node first) {
            lastWaiter = firstWaiter = null;
            do {
                AbstractQueuedSynchronizer.Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }
    
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            AbstractQueuedSynchronizer.Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
    
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            AbstractQueuedSynchronizer.Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
    
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            AbstractQueuedSynchronizer.Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
    }
    

    可以知道lock.newCondition()创建出来的是一个ConditionObject,而ConditionObject实现了Condition接口的所有实现方法,其中ConditionObject是AQS的内部类

    2、await方法
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            //加入到Condition等待队列中
            AbstractQueuedSynchronizer.Node node = addConditionWaiter();
            //释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
    

    将当前线程的结点加入到相应的Condition的等待队列中,并释放相应的锁

    3、signal方法
        public final void signal() {
            if (!isHeldExclusively()) //是否获取了锁(重入锁中是直接 return  独占锁线程==当前线程)
                throw new IllegalMonitorStateException();
            Node first = firstWaiter; //等待队列头节点
            if (first != null)
                doSignal(first);
        }
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null) //等待队列首节点的后继节点为空,说明只有一个节点,那么尾节点也置空
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first)  && (first = firstWaiter) != null); // 等待队列首节点唤醒失败,则唤醒下个节点
        }
        final boolean transferForSignal(Node node) {// 将节点加入同步队列
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 设置节点状态为0----(等待队列状态只能为-2,-3,用-2进行cas设置失败,说明是-3)
                return false;
            Node p = enq(node); // 放入同步队列尾部
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    

    当我们调用某个条件队列的signal()方法后,会将某个或所有等待在这个条件队列中的线程唤醒,分公平锁与非公平锁两种情况

    • 公平锁:被唤醒的线程直接加入到FIFO阻塞队列中
    • 非公平锁:被唤醒的线程和普通线程一样需要去争锁,如果没有抢到,则同样加入到FIFO阻塞队列中

    在这里需要注意的是,Node节点是被一个一个转移过去的,哪怕我们调用的signalAll()方法,node节点也是一个一个转移过去的而不是将整个条件队列接在同步队列末尾,同时需要注意的我们在同步队列中只有使用prev,next来串联表而不是使用nextWaiter,我们在条件队列中只使用nextWaiter来串联链表,而不使用prev,next;就是使用了同样的Node数据结构而完全不同的链表,因此将节点从条件队列转移到同步队列中时,我们需要断开原来的链接(nextWaiter)建立新的链接(prev / next)在某种程度上也是需要将节点一个一个转移过去。


    image.png

    相关文章

      网友评论

          本文标题:Condition

          本文链接:https://www.haomeiwen.com/subject/pyrfaktx.html