美文网首页其他零散知识点
JAVA-Lock解析-二-Condition

JAVA-Lock解析-二-Condition

作者: AlanSun2 | 来源:发表于2019-10-10 17:43 被阅读0次

    以下内容是 Condition jdk8 版本的官方翻译

    Condition 将对象监视方法(wait,notify 和 notifyAll)分解为不同的对象,从而通过与任意 Lock 实现结合使用,从而使每个对象具有多个等待集。如果 Lock 替换了 synchronized,而 Condition 替换了Object监视器方法的使用。

    Condition (也称为条件队列或条件变量)为一个线程暂停执行(“等待”)直到另一线程通知某些状态条件现在可能为真提供了一种方法。由于对该共享信息的访问发生在不同的线程中,因此必须对其进行保护,因此某种形式的锁与该 Condition 相关联。等待条件提供的关键属性是它自动释放关联的锁并挂起当前线程,就像Object.wait一样。

    Condition 实例从本质上绑定到锁。要获取特定 Lock 实例的 Condition 实例,请使用其 newCondition() 方法。

    例如,假设我们有一个有界缓冲区,它支持 put 和 take 方法。如果尝试在空的缓冲区上执行提取操作,则线程将阻塞,直到有可用项为止。如果尝试在完整的缓冲区上进行放置,则线程将阻塞,直到有可用空间为止。我们希望将 put 和 take 放在单独的等待集中,以便我们可以使用仅当缓冲区中的 item 或空间可用时才通知单个线程的优化。这可以使用两个Condition 实例来实现。(画外音:这个好像也是阻塞队列(BlockQueue)的基础)

     class BoundedBuffer {
       final Lock lock = new ReentrantLock();
       final Condition notFull  = lock.newCondition(); 
       final Condition notEmpty = lock.newCondition(); 
    
       final Object[] items = new Object[100];
       int putptr, takeptr, count;
    
       public void put(Object x) throws InterruptedException {
         lock.lock();
         try {
           while (count == items.length)
             notFull.await();
           items[putptr] = x;
           if (++putptr == items.length) putptr = 0;
           ++count;
           notEmpty.signal();
         } finally {
           lock.unlock();
         }
       }
    
       public Object take() throws InterruptedException {
         lock.lock();
         try {
           while (count == 0)
             notEmpty.await();
           Object x = items[takeptr];
           if (++takeptr == items.length) takeptr = 0;
           --count;
           notFull.signal();
           return x;
         } finally {
           lock.unlock();
         }
       }
     }
    

    Condition 实现类可以提供与对象监视器方法不同的行为和语义,例如,保证通知的顺序,或者在执行通知时不需要锁定。如果实现提供了这种特殊的语义,则实现必须记录这些语义。

    请注意,Condition 实例只是普通对象,它们本身可以用作 synchronized 的目标,并且可以调用自己的监视器等待和通知方法。获取 Condition 实例的监视器锁或使用​​其监视器方法与获取与该 Condition 相关联的锁或使用其等待和信令方法没有指定的关系。建议避免混淆,除非在 Condition 的实现类中,否则不要以这种方式使用 Condition 实例。

    除非另有说明,否则为任何参数传递null值都会导致引发NullPointerException。

    实现类注意事项

    当等待条件时,通常会允许“虚假唤醒”,作为对底层平台语义的让步。这对大多数应用程序几乎没有实际影响,因为应该始终在循环中等待一个条件,测试等待状态 state 是否满足条件。一个实现类可以自由地消除虚假唤醒的可能性,但是建议应用程序程序员始终假定它们会发生,因此总是在循环中等待。

    Condition 等待的三种形式(可中断,不可中断和定时)在它们在某些平台上的实现容易程度和性能特征上可能有所不同。特别是,可能很难提供这些功能并维护特定的语义,例如排序保证。此外,中断挂起线程的能力可能并不总是在所有平台上都可行。

    因此,不需要实现为所有三种等待形式定义完全相同的保证或语义,也不需要支持挂起线程的中断。

    需要一个实现类来清楚地记录每种等待方法提供的语义和保证。当实现类确实支持挂起线程中断时,它必须遵守此接口中定义的中断语义。

    由于中断通常意味着取消,并且通常不经常进行中断检查,因此与正常方法返回相比,实现类可能更喜欢对中断做出响应。即使可以证明中断是在另一个可能解除线程阻塞的操作之后发生的,也是如此。实现类应记录此行为。


    Condition 定义

    public interface Condition {
        //使当前线程等待,直到发出信号或被中断为止。
        void await() throws InterruptedException;
        //使当前线程等待,直到发出信号。中断不抛出异常
        void awaitUninterruptibly();
        //使当前线程等待,直到发出信号或中断它,或者经过指定的等待时间。
        long awaitNanos(long nanosTimeout) throws InterruptedException;
        //使当前线程等待,直到发出信号或中断它,或者经过指定的等待时间。
        //和 awaitNanos 一样,只是可以指定时间单位
        boolean await(long time, TimeUnit unit) throws InterruptedException;
        //使当前线程等待,直到发出信号或中断它,或者经过指定的期限。
        boolean awaitUntil(Date deadline) throws InterruptedException;
        //唤醒一个等待线程。
        //如果有任何线程在这种情况下等待,则选择一个线程进行唤醒。然后,该线程必须重新获取锁,然后才能从等待返回。
        void signal();
        //唤醒所有等待的线程
        void signalAll();
    }
    

    它的主要实现类是 ConditionObject in AbstractQueuedLongSynchronizer。

    首先我们看下 ConditionObject 的结构。

     public class ConditionObject implements Condition, java.io.Serializable {
            private static final long serialVersionUID = 1173984872572414699L;
            //此条件队列的第一个 Node
            private transient Node firstWaiter;
            //此条件队列的最后一个 Node
            private transient Node lastWaiter;
    
            /**
             * Creates a new {@code ConditionObject} instance.
             */
            public ConditionObject() { }
            ...
            ...
    }
    

    以上是 ConditionObject 的构造方法和两个成员变量,也容易理解。

    接下来一个个的看下各个实现方法:

    await:使当前线程等待,直到发出信号或被中断为止。

    public final void await() throws InterruptedException {
        if (Thread.interrupted())//如果有中断信号,则抛出中断异常
            throw new InterruptedException();
        Node node = addConditionWaiter();//把线程放入条件等待队列的最后,并返回 Node
        long savedState = fullyRelease(node);//释放锁
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {//判断 Node 是否被放入同步队列
            LockSupport.park(this);//阻塞,等待被唤醒
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                //判断是否被中断,这里需要判断是否是singal 引起的唤醒还是中断引起的唤醒,所以 checkInterruptWhileWaiting 会返回3中状态。
                //0:没有中断
                //1:唤醒后被中断
                //-1:唤醒前被中断
                //需要注意的是:这里的中断后,Node 还是会被放入同步队列
                break;
        }
        //获取锁
        //情况1:acquireQueued(node, savedState) == false,interruptMode  不变
        //情况2:acquireQueued(node, savedState) == true,interruptMode == 0 变成 1
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        //如果不是最后一个 Node,则整理整个条件等待队列
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        if (interruptMode != 0)//中断处理
            reportInterruptAfterWait(interruptMode);
    }
    
    
    //0:没有中断
    //1:唤醒后被中断
    //-1:唤醒前被中断
    private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
            (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
            0;
    }
    
    //这个是 AQS 的方法
    //取消节点后,如有必要,转移节点至同步队列。如果线程在发出信号之前被中断,则返回 true。
    final boolean transferAfterCancelledWait(Node node) {
        //线程在发出信号之前被中断,则返回 true。此时可能线程也被唤醒,如果 CAS 竞争成功,则入队
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);//加入同步队列
            return true;
        }
        /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
        //线程在发出信号之后被中断,此时可能正在入队
        while (!isOnSyncQueue(node))
            Thread.yield();//让出CPU
        return false;
    }
    

    awaitUninterruptibly:使当前线程等待,直到发出信号。中断也不抛出异常,继续等待。

    public final void awaitUninterruptibly() {
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        boolean interrupted = false;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if (Thread.interrupted())
                interrupted = true;
        }
        if (acquireQueued(node, savedState) || interrupted)
            selfInterrupt();
    }
    

    可以看到 awaitUninterruptibly 比 await 简单很多,因为它会忽略中断抛出异常这个选项,无论时唤醒前还是唤醒后中断做相同处理。

    awaitNanos:使当前线程等待,直到发出信号或中断它,或者经过指定的等待时间。

    public final long awaitNanos(long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())//如果中断,抛出异常
            throw new InterruptedException();
        Node node = addConditionWaiter();//把当前线程加入到条件等待队列
        int savedState = fullyRelease(node);//释放锁
        final long deadline = System.nanoTime() + nanosTimeout;//计算出等待结束的时间点
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {//Node 是否转移到同步队列
            if (nanosTimeout <= 0L) {//nanosTimeout <= 0,直接放入同步队列,不需要唤醒,并退出循环
                transferAfterCancelledWait(node);
                break;
            }
            //传入的时间 >= 1ms,则线程挂起 nanosTimeout 。
            //如果 nanosTimeout 小于1ms,线程不会阻塞,直到 nanosTimeout <= 0,退出循环
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)//如果被中断,则退出循环。
                break;
            nanosTimeout = deadline - System.nanoTime();//唤醒后,重新计算时间差
        }
        //以下和 await 方法一模一样
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime();
    }
    

    从代码分析看,等待时间结束和正常被别的线程唤醒都有可能转移到同步队列。但一般超时等待不要使用唤醒,唤醒后还是会等待,直到时间超时。

    await(long time, TimeUnit unit):和 awaitNanos 一模一样,只是多了个时间单位,还有会返回一个 boolean 值,false:时间超时退出循环,true:其他原因获取退出循环,例如中断,被别的唤醒。

    public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
        long nanosTimeout = unit.toNanos(time);//转换成纳秒
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        boolean timedout = false;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (nanosTimeout <= 0L) {
                timedout = transferAfterCancelledWait(node);//就算执行到这里也有可能被别的线程唤醒,此时 timeout = false
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;//返回 boolean 值
    }
    

    awaitUntil:使当前线程等待,直到发出信号或中断它,或者经过指定的期限。

    public final boolean awaitUntil(Date deadline)
            throws InterruptedException {
        long abstime = deadline.getTime();
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        boolean timedout = false;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            if (System.currentTimeMillis() > abstime) {
                timedout = transferAfterCancelledWait(node);
                break;
            }
            LockSupport.parkUntil(this, abstime);//使用 parkUntil
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return !timedout;
    }
    
    1. 和 awaitNanos 和 await(long time, TimeUnit unit) 不同的是 awaitUntil 使用了 LockSupport 的 parkUntil 而不是 parkNanos,其他操作和另外两个方法一样。
    2. 返回一个 boolean 值。

    signal:唤醒一个等待线程。采用的唤醒方式是,按队列排序唤醒第一个。FIFO

    public final void signal() {
        if (!isHeldExclusively())//判断当前线程是否获取锁
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first);//遍历节点
    }
    
    private void doSignal(Node first) {
        do {
            //把第一个的下一个置为 第一个,如果整个条件队列只剩下一个,则把 lastWaiter  置为 null
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
            //如果 transferForSignal 返回 false 且下一个 Node 不为 null,则继续唤醒下一个 Node,直到成功唤醒或没有 Node 为止
        } while (!transferForSignal(first) &&
                 (first = firstWaiter) != null);
    }
    
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        //翻译:如果CAS失败,说明线程被中断,Node 由自己的线程放入同步队列
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);//放入同步队列,返回前继 Node
        int ws = p.waitStatus;获取前继 Node 的 waitStatus
        //如果前继 Node 被取消或 CAS 设置前继 Node 的 waitStatus = SIGNAL 失败则立刻唤醒 Node
        //其实我觉得直接唤醒也没问题,只是这样做更节省CPU资源吧
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    

    为什么前继 Node 被取消或 CAS 设置前继 Node 的 waitStatus = SIGNAL 失败,需要立刻唤醒 Node?
    我的理解是出现这种情况说明同步队列发生了一些错误,唤醒后让线程尝试获取锁,也可以在shouldParkAfterFailedAcquire 方法种修复这种错误,比如前继 Node 被取消,shouldParkAfterFailedAcquire 可以把 Node 前面已取消的节点全部剔除掉。

    signalAll:唤醒全部。按队列顺序逐一唤醒。

    public final void signalAll() {
        if (!isHeldExclusively())//判断当前线程是否获取锁
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignalAll(first);
    }
    
    private void doSignalAll(Node first) {
        lastWaiter = firstWaiter = null;
        do {
            Node next = first.nextWaiter;
            first.nextWaiter = null;//去除引用,帮助GC
            transferForSignal(first);//加入同步队列
            first = next;//把下一个当作第一个
        } while (first != null);
    }
    

    好了,到这里 Condition 基本讲完了。以上都是个人理解,如果由不对的地方,请提醒我纠正。

    相关文章

      网友评论

        本文标题:JAVA-Lock解析-二-Condition

        本文链接:https://www.haomeiwen.com/subject/rueguctx.html