并发十一:条件队列Condition实现分析

作者: wangjie2016 | 来源:发表于2018-04-14 15:25 被阅读70次

    Condition

    Condition是J.U.C包中的一个接口,提供了三个主要方法await、signal和signalAll。和Object中的wait、notify、notifyAll三个监视器方法语义一致。

    监视器方法必须放到synchronized修饰的同步体内,Condition接口也是一样,要放到Lock的监视范围也就是lock和unlock之间的代码块内调用,否则会抛出IllegalMonitorStateException异常。

    一个小栗子:

    public class ConditionTest {
        public static class ResponseFuture {
            private final Lock lock = new ReentrantLock();
            private final Condition condition = lock.newCondition();
            private String response;
    
            public boolean isDone() {// 是否处理完成
                return response != null;
            }
    
            public String getResponse() throws InterruptedException{ // 获取响应
                if (!isDone()) {
                    lock.lock();
                    try {
                        while (!isDone()) {
                            condition.await();// 线程阻塞等待 
                            if (isDone()) {
                                break;
                            }
                        }
                    } finally {
                        lock.unlock();
                    }
                }
                return response;
            }
    
            public void done(String response) {// 处理完成
                lock.lock();
                try {
                    this.response = response;
                    condition.signal();// 唤醒阻塞等待的线程
                } finally {
                    lock.unlock();
                }
            }
        }
    
        public static void main(String[] args) {
            final ResponseFuture responseFuture = new ResponseFuture();
            new Thread(new Runnable() {// 请求线程
                public void run() {
                    System.out.println("发送一个同步请求");
                    try {
                        // 获取反馈内容,请求没有反馈就会一直等待
                        System.out.println(responseFuture.getResponse());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
            
            
            new Thread(new Runnable() {// 处理线程
                public void run() {
                    try {
                        Thread.sleep(10000);// 模拟处理一会
                    }catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    // 处理完成
                    responseFuture.done("请求处理完成");
                }
            }).start();
        }
    }
    

    ConditionObjec

    Condition接口的实现ConditionObject是AbstractQueuedSynchronizer的内部
    在AQS()中,:

    public class ConditionObject implements Condition, 
                    java.io.Serializable {
        private static final int REINTERRUPT = 1;
        private static final int THROW_IE = -1;
        /** 条件队列首节点 */
        private transient Node firstWaiter;
        /** 条件队列尾节点 */
        private transient Node lastWaiter;
        public ConditionObject() {}
        ... ...
    }
    

    原理

    维护一个链表Node队列,用来进行阻塞线程的排队和调度,称为条件队列。

    线程调用了condition.await()后将先释放该线程持有的锁,构造成Node加入条件队列,然后再挂起该线程。

    其他线程调用了condition.signal()后,将条件队列中第一个等待的Node节点转移到AQS同步队列,在AQS同步队列中参与锁获取的调度,如果获取到锁则该线程被唤醒。

    await流程

    // s1
    public final void await() throws InterruptedException {
                if (Thread.interrupted())
                    throw new InterruptedException();// 如果中断 抛出异常
                Node node = addConditionWaiter();// 入列 s2
                int savedState = fullyRelease(node);// 释放当前锁 s3
                int interruptMode = 0;
                while (!isOnSyncQueue(node)) {//  不在与同步队列
                    LockSupport.park(this);
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                        break;
                }
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                    interruptMode = REINTERRUPT;
                if (node.nextWaiter != null)
                    unlinkCancelledWaiters();
                if (interruptMode != 0)
                    reportInterruptAfterWait(interruptMode);
            }
    

    s1:线程未中断开始入列,转入s2,否则直接抛出异常。
    入列成功开始释放当前线程持有的锁进入s3。
    isOnSyncQueue(node)判断节点是否在AQS同步队列中,如果不在,说明该线程还没有资格参与锁的竞争,将当前线程挂起。
    直到node加入了AQS同步队列或者node对应的线程被中断,才退出while{}循环。
    checkInterruptWhileWaiting方法会检查中断发生的时机:
    signal之前发生的中断返回THROW_IE,表示抛出InterruptedException
    signal之后发生的中断返回REINTERRUPT,表示记录中断状态
    否则返回0,表示无中断
    即便是发生中断transferAfterCancelledWait方法依然会将node转移到AQS同步队列。
    调用acquireQueued()方法在AQS同步队列中竞争锁,拿到锁后开始处理中断,reportInterruptAfterWait方法中根据interruptMode选择是抛出异常还是记录状态。

    // s2
    private Node addConditionWaiter() {
                Node t = lastWaiter;
                // 如果尾节点状态不为CONDITION,清出队列
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                // 构造节点并入列
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    

    s2:如果尾节点t不为空或状态不为CONDITION,调用
    unlinkCancelledWaiters()方法,这是一个循环方法目的是从头节点开始摘掉所以状态为不为CONDITION的节点。
    t==null说明当前的队列还是空队列,将node赋给头节点firstWaiter,否则,将node链入尾部。将尾节点lastWaiter指向node,返回s1。

    // s3
    final int fullyRelease(Node node) {
            boolean failed = true;
            try {
                int savedState = getState();// 获取当前同步状态
                if (release(savedState)) {// 释放锁
                    failed = false;
                    return savedState;// 释放成功返回同步状态
                } else {
                    throw new IllegalMonitorStateException();
                }
            } finally {
                if (failed)// 释放失败将节点标识为取消
                    node.waitStatus = Node.CANCELLED;
            }
        }
    

    s3:用release()方法释放当前锁,这是独占锁的释放方法,所以Condition对象只支持在独占锁中使用。释放完毕返回s1。

    signal流程

    public final void signal() {
        if (!isHeldExclusively())//非持有线程 抛出异常
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;//头节点
        if (first != null)//队列非空
            doSignal(first);
    }
    private void doSignal(Node first) {
        do {
            if ((firstWaiter = first.nextWaiter) == null)
                lastWaiter = null;
            first.nextWaiter = null;
        } while (!transferForSignal(first) && (first = firstWaiter) != null);
    }
    

    s1: 如果first.nextWaiter==null说明队列为空,将尾节点lastWaiter也置空。
    first.nextWaiter = null将头节点从队列摘掉。
    转入s2进行节点转移。
    转移成功则退出循环,转移不成功并且队列中还有等待的节点继续转移下个节点。

    // s2
    final boolean transferForSignal(Node node) {
        //节点状态从Node.CONDITION 置换为0,
        //因为CONDITION是条件节点的状态
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        Node p = enq(node);//入同步队列
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    

    s2:CAS将节点状态由CONDITION设置为0。
    enq(node)为AQS中同步队列的入列方法,将节点加入同步队列。
    ws > 0说明节点被取消了(有可能中断)或者将节点状态置为SIGNAL失败,则直接唤醒线程。
    被唤醒的线程会在"await流程s1处"参与锁的竞争或者处理中断。

    signalAll和signal逻辑一样,只是在循环中执行transferForSignal(first),将条件队列中的节点依次全部转移到同步队列。

    小结:

    1:Condition只能使用在独占锁中使用,如ReentrantLock、ReentrantReadWriteLock.writerLock。在共享锁中则无法使用 ,如ReentrantReadWriteLock.readerLock。

    2:Condition中的方法,必须要放到Lock的监视范围内,也就是lock和unlock之间的代码块内调用。

    码字不易,转载请保留原文连接https://www.jianshu.com/p/e72b43ebd788

    相关文章

      网友评论

        本文标题:并发十一:条件队列Condition实现分析

        本文链接:https://www.haomeiwen.com/subject/uqxxkftx.html