美文网首页
2020-06-14_关于AQS条件队列与同步队列分析

2020-06-14_关于AQS条件队列与同步队列分析

作者: kikop | 来源:发表于2020-06-14 14:33 被阅读0次

    关于AQS条件队列与同步队列分析

    1 概述

    本文主要演示一下Condition中同步队列和条件队列是如何交互的。

    1.1 交互流程

    image.png

    图 1 条件同步队列(图片来源网上)

    2源码分析(ConditionObject)

    2.1 条件队列等待await

    1.将当前线程加入Condition锁队列。特别说明的是,这里不同于AQS的队列,这里进入的是Condition的FIFO队列。

    2.释放锁。这里可以看到将锁释放了,否则别的线程就无法拿到锁而发生死锁。

    3.自旋(while)挂起,直到被唤醒(signal把他重新放回到AQS的等待队列)或者超时或者CACELLED等。

    4.获取锁(acquireQueued)。并将自己从Condition的FIFO队列中释放,表明自己不再需要锁(我已经拿到锁了)。

    2.1.1 await源码分析

    public final void await() throws InterruptedException {
    
                if (Thread.interrupted())
    
                    throw new InterruptedException();
    
                Node node = addConditionWaiter();  //创建条件节点并加到条件队列
    
                int savedState = fullyRelease(node); // 释放锁,返回旧值
    
                int interruptMode = 0;
    
                while (!isOnSyncQueue(node)) { // 不在同步队列里等待signal
    
                    LockSupport.park(this); // 阻塞
    
                    if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 清除中断标志位
    
                        break;
    
                }
    
    // 被唤醒了,将节点移动到同步队列里面,自旋并阻塞等待信号unpark
    
                if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
    
                    interruptMode = REINTERRUPT;
    
                if (node.nextWaiter != null) // clean up if cancelled
    
                    unlinkCancelledWaiters();
    
                if (interruptMode != 0)
    
                    reportInterruptAfterWait(interruptMode);
    
            }
    
    
    
        public ReentrantLock() {
    
            sync = new NonfairSync();
    
    }
    
    

    2.2 条件队列通知signal

    2.2.1 signal源码分析

            private void doSignal(Node first) {
    
    do {
    
                    if ( (firstWaiter = first.nextWaiter) == null)
    
                        lastWaiter = null;
    
    //因为 first 马上要被移到阻塞队列了,和条件队列的链接关系在这里断掉
    
                    first.nextWaiter = null;
    
                } while (!transferForSignal(first) && // 加到同步队列,然后unpark
    
                         (first = firstWaiter) != null);
    
    }
    
     final boolean transferForSignal(Node node) {
    
            /*
    
             * If cannot change waitStatus, the node has been cancelled.
    
             */
    
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    
                return false;
    
            /*
    
             * Splice onto queue and try to set waitStatus of predecessor to
    
             * indicate that thread is (probably) waiting. If cancelled or
    
             * attempt to set waitStatus fails, wake up to resync (in which
    
             * case the waitStatus can be transiently and harmlessly wrong).
    
             */
    
            Node p = enq(node);
    
            int ws = p.waitStatus;
    
    //ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程。
    
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
    
                LockSupport.unpark(node.thread); // 唤醒node对应的线程
    
            return true; // 通知结束,
    
    }
    
    

    从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。

    对每个节点执行唤醒操作时,将节点加入同步队列,再等待同步队列被唤醒(什么时候,unlock时)。

    2.3 细节分析

    2.3.1 waitStatus几种值

    CANCELED:1

    0

    SIGNAL:-1

    CONDITION:-2

    PROPAGATE:-3

    2.3.2在同步队列的判断条件

    参数node waitStatus==-2或者前置节点null,不在,否则下一步

    Next != null,在,否则下一步

    从同步队列尾部查找node

    2.3.3条件队列转入同步队列

    同步队列的初始化节点:initNode(默认waitStatus:0)

    取出条件队列中的第一个等待节点(waitStatus:-2),cas操作将waitStatus修改为:0,然后如同步队列。

    取同步队列的前置节点。

    如果waitStatus>0(实际是CANCELED),或者 CAS 失败,会进到这里唤醒线程,直接唤醒当前node节点(否则等待lock.unlock操作)

    2.3.4时序(重点)

    这里:await LockSupport.park(this);t2:doSignal时,还是park在此

    直到t2:lock.unlock()才能被唤醒,在同步队列中自旋。

    3代码示例(ConditionObject)

    3.1 等待线程

    
    package com.kikop.demo.MyAqsConditionQueue;
    
    import java.util.concurrent.locks.Condition;
    
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
    
     * @author kikop
    
     * @version 1.0
    
     * @project Name: javainaction
    
     * @file Name: MyConditionAwaitRunnableTask
    
     * @desc 功能描述
    
     * @date 2020/6/13
    
     * @time 22:41
    
     * @by IDE: IntelliJ IDEA
    
     */
    
    public class MyConditionAwaitRunnableTask implements Runnable {
    
        private ReentrantLock reentrantLock;
    
        private Condition condition;
    
        public MyConditionAwaitRunnableTask(ReentrantLock reentrantLock, Condition condition) {
    
            this.reentrantLock = reentrantLock;
    
            this.condition = condition;
    
        }
    
        @Override
    
        public void run() {
    
            try {
    
                reentrantLock.lock(); // 当前t2节点加到同步队列
    
                System.out.println(Thread.currentThread().getName() + "lock,拿到锁了");
    
                System.out.println(Thread.currentThread().getName() + "等待信号");
    
                try {
    
                    condition.await();
    
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
    
                }
    
                System.out.println(Thread.currentThread().getName() + "拿到信号");
    
            } finally {
    
                reentrantLock.unlock();
    
                System.out.println(Thread.currentThread().getName() + "unlock结束");
    
            }
    
        }
    
    }
    
    

    3.2 唤醒线程

    package com.kikop.demo.MyAqsConditionQueue;
    
    import java.util.concurrent.locks.Condition;
    
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
    
     * @author kikop
    
     * @version 1.0
    
     * @project Name: javainaction
    
     * @file Name: MyConditionAwaitRunnableTask
    
     * @desc 功能描述
    
     * @date 2020/6/13
    
     * @time 22:41
    
     * @by IDE: IntelliJ IDEA
    
     */
    
    public class MyConditionSignalRunnableTask implements Runnable {
    
        private ReentrantLock reentrantLock;
    
        private Condition condition;
    
        public MyConditionSignalRunnableTask(ReentrantLock reentrantLock, Condition condition) {
    
            this.reentrantLock = reentrantLock;
    
            this.condition = condition;
    
        }
    
        @Override
    
        public void run() {
    
            try {
    
                reentrantLock.lock(); // 当前t2节点加到同步队列
    
                System.out.println(Thread.currentThread().getName() + "lock,拿到锁了");
    
                try {
    
                    Thread.sleep(20*1000);
    
                } catch (InterruptedException e) {
    
                    e.printStackTrace();
    
                }
    
                System.out.println(Thread.currentThread().getName() + "发出信号");
    
                condition.signal(); // 通知条件队列中第一个节点,此时条件队列-->加到同步队列,并仍然阻塞
    
            } finally {
    
                reentrantLock.unlock(); // 从同步队列中头节点开始找,并unpark
    
                System.out.println(Thread.currentThread().getName() + "unlock结束");
    
            }
    
        }
    
    }
    
    

    3.3 测试

    package com.kikop.demo.MyAqsConditionQueue;
    
    import java.util.concurrent.locks.Condition;
    
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
    
     * @author kikop
    
     * @version 1.0
    
     * @project Name: javainaction
    
     * @file Name: AqsConditionQueueTest
    
     * @desc 功能描述
    
     * @date 2020/6/13
    
     * @time 22:06
    
     * @by IDE: IntelliJ IDEA
    
     */
    
    public class AqsConditionQueueTest {
    
        public static void test() {
    
            ReentrantLock reentrantLock = new ReentrantLock();
    
            Condition condition = reentrantLock.newCondition();
    
            Thread t1 = new Thread(new MyConditionAwaitRunnableTask(reentrantLock, condition), "thread1");
    
            Thread t2 = new Thread(new MyConditionSignalRunnableTask(reentrantLock, condition), "thread2");
    
            t1.start();
    
            t2.start();
    
        }
    
        public static void main(String[] args) {
    
            test();
    
        }
    
    }
    

    thread1lock,拿到锁了step1

    thread1等待信号 step2

    thread2lock,拿到锁了 step3

    thread2发出信号 step4

    thread2unlock结束(等待中) step5

    thread1拿到信号 step6

    thread1unlock结束 step7

    3.4图解步骤

    image.png

    图 2 t1由于不在同步队列找中,Await一直park趴在这儿(unlock也不会执行)

    image.png

    图 3 t2发出信号

    image.png

    图 4 t2 unlock

    image.png

    图 5 进入同步队列中自旋

    image.png

    图 6移动同步队列的头结点指针,当前node作为头结点,前置节点位置null,让gc回收(首次即为始化的initNode

    image.png

    图 7 t1执行业务逻辑

    image.png

    图 8 t1释放当前节点,unpark后置等待节点,整个流程结束

    4 参考

    1. 不怕难之ReentrantLock及其扩展

    https://www.jianshu.com/p/a43a059b9c0a

    相关文章

      网友评论

          本文标题:2020-06-14_关于AQS条件队列与同步队列分析

          本文链接:https://www.haomeiwen.com/subject/xpvxxktx.html