Condition

作者: 囧囧有神2号 | 来源:发表于2018-05-14 17:59 被阅读0次

    生产者消费者实例:

    public class ProductorCustom<T> {
    
        private final ReentrantLock lock = new ReentrantLock();
        private Condition putButFull = lock.newCondition();
        private Condition tackButEmpty = lock.newCondition();
        private int head, tail, count;
        private final T[] items;
    
        public ProductorCustom() {
            this(10);
        }
    
        public ProductorCustom(int maxSize) {
            items = (T[]) new Object[maxSize];
        }
    
        public void put(T item) throws InterruptedException {
            lock.lock();
            try {
                while(count == items.length) {
                    putButFull.await();
                }
                items[tail] = item;
                if (++tail == items.length) {
                    tail = 0;
                }
                ++count;
                tackButEmpty.signal();
            }
            finally {
                lock.unlock();
            }
        }
    
        public T take() throws InterruptedException {
            lock.lock();
            try {
                while (count == 0) {
                    tackButEmpty.await();
                }
                T item = items[head];
                if (++head == items.length) {
                    head = 0;
                }
                --count;
                putButFull.signal();
                return item;
            }
            finally {
                lock.unlock();
            }
        }
    }
    

    Condition是Object的wait/notify的替代,更灵活;Condition接口定义的方法,await对应于Object.wait,signal对应于Object.notify,signalAll对应于Object.notifyAll。
    ReentrantLock 的newCondition():

        public Condition newCondition() {
            return sync.newCondition();
        }
    

    最终调用的是AQS中的内部类ConditionObject,它实现了Condition接口。

    概述

    AQS维护了一个锁队列;ConditionObject同样维护了一个条件队列,该队列里的Node等待着signal信号;两个队列间的关系是这样的:

    1. 比如节点A是锁队列的head,acquire成功,B等待在A后;
    2. A线程执行await,会把A移到条件队列中,release唤醒B
    3. B执行signal,会将A放回到锁队列,但并没有被唤醒;

    大致上就是一个线程正在运行,但是因为某些条件未满足需要等待,我们就把他放到条件队列中等待,其他线程就有机会执行,当条件满足就从条件队列中取出一个或全部等待的线程,把他放回锁队列中等待获取执行资格;
    接下来看看实现细节:

    ConditionObject

    AQS里的内部类,实现了Condition,ReentrantLock 调用的就是它;
    所以才说AQS是JUC包之基;

    public class ConditionObject implements Condition, java.io.Serializable {
            private static final long serialVersionUID = 1173984872572414699L;
            /** 条件队列的头节点. */
            private transient Node firstWaiter;
            /**条件队列的尾节点. */
            private transient Node lastWaiter;
    
    await
            public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();
    //创建当前线程的Node节点,放入条件队列
                Node node = addConditionWaiter();
    //锁队列中的当前线程release,唤醒后面的线程
                int savedState = fullyRelease(node);
                int interruptMode = 0;
    //isOnSyncQueue检测节点是否在锁队列;
    //这里的逻辑是signal会将node放回到锁队列,如果node不在锁队列,
    //说明条件没满足即没有收到signal,需要继续等待
                while (!isOnSyncQueue(node)) {
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
    //唤醒后,执行acquireQueued,尝试获取锁即改变同步状态,失败就在锁队列中等待
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null) // clean up if cancelled
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters(); //清理waitStatus为CONDITION的节点
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    

    如上面注释,一个原本锁队列中正在运行的HEAD节点,执行await,会在条件队列中创建它的Node节点,锁队列中的它执行release操作,之后就LockSupport.park挂起等待signal唤醒

    signal
            public final void signal() {
    //返回true代表当前线程正式获取锁的线程
                if (!isHeldExclusively())
                    throw new IllegalMonitorStateException();
                Node first = firstWaiter;
                if (first != null)
                    doSignal(first);
            }
    
            private void doSignal(Node first) {
                do {
                    if ( (firstWaiter = first.nextWaiter) == null)
                        lastWaiter = null;
                    first.nextWaiter = null;
    //这里利用循环直到将firstWaiter放回锁队列为止
                } while (!transferForSignal(first) &&  
                         (first = firstWaiter) != null);
            }
    
        final boolean transferForSignal(Node node) {
            /*
             * If cannot change waitStatus, the node has been cancelled.
             */
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
    
            /*
             * Splice onto queue and try to set waitStatus of predecessor to
             * indicate that thread is (probably) waiting. If cancelled or
             * attempt to set waitStatus fails, wake up to resync (in which
             * case the waitStatus can be transiently and harmlessly wrong).
             */
            Node p = enq(node); //将节点加入到锁队列
            int ws = p.waitStatus;
    //如果该结点的状态为cancel 或者修改waitStatus失败,则直接唤醒。
    //这里一般ws=0,所以会执行更改节点状态,改为SIGNAL,更改成功放回锁队列
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    

    signal主要逻辑就是将条件队列的第一个节点firstWaiter加入到锁队列。
    超时方法分析在我的CyclicBarrier文章里

    相关文章

      网友评论

          本文标题:Condition

          本文链接:https://www.haomeiwen.com/subject/tzxjdftx.html