美文网首页
Java8-显示的Condition对象

Java8-显示的Condition对象

作者: 多喝水JS | 来源:发表于2017-09-10 11:01 被阅读166次

    先来看下例子

    public class ConditionTest implements Runnable {
    
        private Lock lock;
        private Condition con;
    
        public ConditionTest(Lock lock, Condition condition) {
            this.lock = lock;
            this.con = condition;
        }
    
        public void run() {
            if ("thread1".equals(Thread.currentThread().getName()))
                testThread1Waiter();
            if ("thread2".equals(Thread.currentThread().getName()))
                testThread2Signal();
        }
    
        public void testThread1Waiter() {
            lock.lock();
            try {
                try {
                    System.out.println("thead1被阻塞");
                    con.await();
                    System.out.println("thead1被唤醒");
                } catch (InterruptedException e) {
                }
            } finally {
                lock.unlock();
            }
        }
    
        public void testThread2Signal() {
            lock.lock();
            try {
                con.signal();
                System.out.println("thead2唤醒等待线程");
            } finally {
                lock.unlock();
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            Lock lock = new ReentrantLock();
            Condition condition = lock.newCondition();
            new Thread(new ConditionTest(lock, condition), "thread1").start();
            new Thread(new ConditionTest(lock, condition), "thread2").start();
        }
    }
    

    运行结果

    thead1被阻塞
    thead2唤醒等待线程
    thead1被唤醒
    

    Condition的await()实现原理

            /**
             * Implements interruptible condition wait.
             * <ol>
             * <li> If current thread is interrupted, throw InterruptedException.
             * <li> Save lock state returned by {@link #getState}.
             * <li> Invoke {@link #release} with saved state as argument,
             *      throwing IllegalMonitorStateException if it fails.
             * <li> Block until signalled or interrupted.
             * <li> Reacquire by invoking specialized version of
             *      {@link #acquire} with saved state as argument.
             * <li> If interrupted while blocked in step 4, throw InterruptedException.
             * </ol>
             */
            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
                Node node = addConditionWaiter();
                int savedState = fullyRelease(node);
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    

    ①首先判断当前线程是否被中断了,如果已经被中断了,则直接抛InterruptedException给上层调用者,否则进入②

    if (Thread.interrupted())
                    throw new InterruptedException();
    

    ②把当前线程所对应的节点放入condition队列中

           Node node = addConditionWaiter();
           /**
             * Adds a new waiter to wait queue.
             * @return its new wait node
             */
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    

    首先拿到队列(注意数据结构,Condition构建出来的也是一个队列)中最后一个等待者,紧接着判断,判断最后一个等待者的waitStatus不是CONDITION的话,

     if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
    

    解绑取消的等待者,因为通过这句代码

     Node node = new Node(Thread.currentThread(), Node.CONDITION);
    

    我们看到,new出来的Node的状态都是CONDITION的。
    那么unlinkCancelledWaiters做了什么?就是一些指针遍历并判断状态的操作,总结一下就是:从头到尾遍历每一个Node,遇到Node的waitStatus不是CONDITION的就从队列中踢掉,该节点的前后节点相连。

            private void unlinkCancelledWaiters() {
                Node t = firstWaiter;
                Node trail = null;
                while (t != null) {
                    Node next = t.nextWaiter;
                    //把不是condition的都踢掉
                   //1、如果首节点不是condition
                    if (t.waitStatus != Node.CONDITION) {
                        //2、因为上面 Node next = t.nextWaiter;这里已经记录了t.nextWaiter;的值,所以可以把t.nextWaiter设置为null;
                        t.nextWaiter = null;
                        //3、一开始trail是NULL的,所以进入4,否则进入5
                        if (trail == null)
                            //4、把下个节点当作首节点,进入6
                            firstWaiter = next;
                        else
                            //5、把首节点指向trail.nextWaiter,进入6
                            trail.nextWaiter = next;
                      //6、如果next为NULL,说明conition队列就一个节点,trail指向lastWaiter ,然后进入9
                        if (next == null)
                              lastWaiter = trail;
                    }
                    else
                        //8、如果首节点是condition状态,首节点指向trail,然后进入9
                        trail = t;
                    //9、把下个节点设置为首节点,继续循环判断是否为condition节点,当然如果下个节点为null。也就是等待队列只有一个节点的话,那就退出循环了
                    t = next;
                }
            }
    
    等待队列的基本结构如下图所示: 等待队列

    插入节点只需要将原有尾节点的nextWaiter指向当前节点,并且更新尾节点。更新节点并没有像AQS更新同步队列使用CAS是因为调用await()方法的线程必定是获取了锁的线程,锁保证了操作的线程安全。

    AQS实质上拥有一个同步队列和多个等待队列,具体对应关系如下图所示:

    AQS同步队列与等待队列

    ③完全释放Node的状态

        int savedState = fullyRelease(node);
    
        final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();
                if (release(savedState)) {
                    failed = false;
                    return savedState;
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)
                    node.waitStatus = Node.CANCELLED;
            }
        }
    

    首先获取state,release的时候将整个state传过去,理由是某线程可能多次调用了lock()方法,比如调用了10次lock,那么此线程就将state加到了10,所以这里要将10传过去,将状态全部释放,这样后面的线程才能重新从state=0开始竞争锁,这也是方法被命名为fullyRelease的原因,因为要完全释放锁,释放锁之后,如果有竞争锁的线程,那么就唤醒第一个,这都是release方法的逻辑了

    ④判断Node是否在AbstractQueuedSynchronizer构建的队列中而不是Node是否在Condition构建的队列(waitstatus == condition状态)中,如果Node不在AbstractQueuedSynchronizer构建的队列中,那么调用LockSupport的park方法阻塞。

     while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
      }
    
        final boolean isOnSyncQueue(Node node) {
            if (node.waitStatus == Node.CONDITION || node.prev == null)
                return false;
            if (node.next != null) // If has successor, it must be on queue
                return true;
            /*
             * node.prev can be non-null, but not yet on queue because
             * the CAS to place it on queue can fail. So we have to
             * traverse from tail to make sure it actually made it.  It
             * will always be near the tail in calls to this method, and
             * unless the CAS failed (which is unlikely), it will be
             * there, so we hardly ever traverse much.
             */
            return findNodeFromTail(node);
        }
    
        private boolean findNodeFromTail(Node node) {
            Node t = tail;
            for (;;) {
                //循环当前节点是否是AQS尾节点
                if (t == node)
                    return true;
                if (t == null)
                    return false;
                t = t.prev;
            }
        }
    

    ⑤唤醒后重新参与竞争,如果获取不到锁,将再次睡眠等待唤醒

      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
    
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    ⑥如果节点被取消了,清除condition队列中被取消的节点

    if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
    if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
    

    Condition的signal()实现原理

    ①要能signal(),当前线程必须持有独占锁,否则抛出异常IllegalMonitorStateException。
    那么真正操作的时候,获取第一个waiter,如果有waiter,调用doSignal方法:

        /**
             * Moves the longest-waiting thread, if one exists, from the
             * wait queue for this condition to the wait queue for the
             * owning lock.
             *
             * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
             *         returns {@code false}
             */
            public final void signal() {
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    

    ② 1. 重新设置firstWaiter,指向第一个waiter的nextWaiter,也就是把首节点下个节点当作首节点

    1. 如果第一个waiter的nextWaiter为null,说明当前队列中只有一个waiter,lastWaiter置空
    2. 因为firstWaiter = first.nextWaiter,已经赋值给firstWaiter了,所以把first.nextWaiter置NULL,方便gc回收
            /**
             * Removes and transfers nodes until hit non-cancelled one or
             * null. Split out from signal in part to encourage compilers
             * to inline the case of no waiters.
             * @param first (non-null) the first node on condition queue
             */
            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            }
    

    ③方法本意是将一个节点从Condition队列转换为AbstractQueuedSynchronizer队列,

    /**
         * Transfers a node from a condition queue onto sync queue.
         * Returns true if successful.
         * @param node the node
         * @return true if successfully transferred (else the node was
         * cancelled before signal)
         */
        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    
    

    总结一下方法的实现:

    1. 尝试将Node的waitStatus从CONDITION置为0,这一步失败直接返回false
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
    1. 当前节点进入调用enq方法进入AbstractQueuedSynchronizer队列
         Node p = enq(node);
    
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    
    1. 当前节点通过CAS机制将waitStatus置为SIGNAL
     int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
    return true;
    

    最后返回true。再次回到这里的代码

           private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
                } while (!transferForSignal(first) &&
                         (first = firstWaiter) != null);
            
    

    while循环里的!transferForSignal(first) 变为false了,直接退出循环,所以signal()是唤醒其中一个等待唤醒的线程

    从唤醒的代码我们可以得出一个重要结论:某个await()的节点被唤醒之后并不意味着它后面的代码会立即执行,它会被加入到AbstractQueuedSynchronizer队列的尾部,只有前面等待的节点获取锁全部完毕才能轮到它。
    不过正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为true的,所以,不会在这个时候唤醒该线程。
    只有到发送signal信号的线程调用reentrantLock.unlock()后因为它已经被加到AQS的等待队列中,所以才会被唤醒。

    signalAll方法的作用就是将Condition队列中所有等待的节点逐一队列中从移除,由CONDITION状态变为SIGNAL状态并加入AbstractQueuedSynchronizer队列的尾部。signal()则是唤醒其中一个等待唤醒的线程

    signalAll实现,唤醒等待队列中所有的等待节点

          private void doSignalAll(Node first) {
                lastWaiter = firstWaiter = null;
                do {
                    Node next = first.nextWaiter;
                    first.nextWaiter = null;
                    transferForSignal(first);
                    first = next;
                } while (first != null);
            }
    

    相关文章

      网友评论

          本文标题:Java8-显示的Condition对象

          本文链接:https://www.haomeiwen.com/subject/skuzjxtx.html