美文网首页
(六)Java并发编程之AQS

(六)Java并发编程之AQS

作者: 陪安东尼的漫长岁月 | 来源:发表于2019-11-21 21:29 被阅读0次

    AQS提供了一套多线程访问共享资源的同步器框架,是一个依赖状态(state)的同步器,用于实现依赖于先进先出(FIFO)等待队列的阻塞锁和相关的同步器(信号量,事件等)。其具体子类实现AQS定义的protected方法,原子更新实际状态对应的 state 值;AQS中的其他方法实现所有的等待和阻塞机制。

    AQS层级结构

    AQS.png AQS框架实现类.png

    AQS具备特性

    • 阻塞等待队列
    • 共享/独占
    • 公平/非公平
    • 可重入
    • 允许中断

    AQS同步框架原理

    // acquire操作如下:
    while (synchronization state does not allow acquire) {  //当同步状态获取失败
        enqueue current thread if not already queued;  //如果当前线程未进入等待队列,将其放入等待队列;
        possibly block current thread; //如果当前线程在等待队列,尝试阻塞改线程
    }
    dequeue current thread if it was queued;    //将当前线程移出等待队列
      
    // release操作如下:    
    if (state may permit a blocked thread to acquire){  // 如果当前状态允许一个阻塞的线程申请获取
        unblock one or more queued threads;    // 唤醒一个或多个阻塞队列中等待的线程
    }
    

    为了实现上述操作,需要下面三个基本组件的相互协作:

    1. 同步状态的原子性管理。
    2. 等待队列的管理。
    3. 线程的阻塞与解除阻塞。

    AQS源码初探(JDK11)

    • AbstractQueuedSynchronizer 之父类 AbstractOwnableSynchronizer
    /** AbstractQueuedSynchronizer 的父类,为当前涉及所有权概念的同步器提供了基础。**/
    public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    
        private static final long serialVersionUID = 3737899427754241961L;
    
        protected AbstractOwnableSynchronizer() { }
        // 当前访问独占锁的线程
        private transient Thread exclusiveOwnerThread;
    
        protected final void setExclusiveOwnerThread(Thread thread) {
            exclusiveOwnerThread = thread;
        }
    
        protected final Thread getExclusiveOwnerThread() {
            return exclusiveOwnerThread;
        }
    }
    
    • AbstractQueuedSynchronizer 之主要属性
    // 由head 和 tail 节点即可组成一个等待队列,其具体节点为内部类Node
    
    /** 等待队列的头,延迟初始化。
    除初始化外,只能通过setHead方法进行修改。
    注意:如果head存在,则保证其waitStatus不被取消。*/
    private transient volatile Node head;
    /** 等待队列的尾部,延迟初始化;
    仅通过 enq(Node node) 方法进行修改以添加新的等待节点。*/
    private transient volatile Node tail;
    
    /** 同步状态 */
    private volatile int state;
     // VarHandle mechanics 变量句柄,对属性进行原子操作
    private static final VarHandle STATE;
    private static final VarHandle HEAD;
    private static final VarHandle TAIL;
    
    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
            HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
            TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
        Class<?> ensureLoaded = LockSupport.class;
    }
    

    AQS的功能主要分为两类:独占锁和共享锁。在它的所有子类中,要么实现了它的独占功能的API,要么实现了共享功能的API。
    当AQS的子类实现独占功能时,如ReentrantLock,只要AQS的state变量不为0,并且持有锁的线程不是当前线程,那么代表资源不可访问。
    当AQS的子类实现共享功能时,如CountDownLatch,只要AQS的state变量不为0,那么代表资源不可以为访问。

    • AbstractQueuedSynchronizer 之内部类 Node
    //
    static final class Node {
            /** 标记节点正在共享模式下持有锁 */
            static final Node SHARED = new Node();
            /** 标记节点正在独占模式下持有锁 */
            static final Node EXCLUSIVE = null;
    
            /** waitStatus状态:标记线程撤销获取锁状态,以便被清理出队列*/
            static final int CANCELLED =  1;
            /** waitStatus状态:标记当前节点线程离开队列时要去唤醒下一个节点线程执行*/
            static final int SIGNAL    = -1;
            /** waitStatus状态:标记当前等待队列中排队*/
            static final int CONDITION = -2;
            /** waitStatus状态:标记后继结点可以直接获取锁*/
            static final int PROPAGATE = -3;
            /** 当前节点的等待状态;非负数表示节点不用发出信号;对于普通同步节点,该字段初始化为0, */
            volatile int waitStatus;
    
            /** 当前节点的前任节点,在入队时分配,出队时清空;
                在前任节点取消后,会短路并且重新设置该节点值;
                线程只会取消自身,不会取消其他任何节点 */
            volatile Node prev;
            /** 当前节点的后继节点,在排队期间分配,在出队时清零,在对应的后继节点取消时调整;
                如果当前的后继节点为空,则从尾部扫描上一个节点来再次检查确定是不是尾部;
                被取消的下一节点会设置为节点本身而不是null,以使 isOnSyncQueue 更轻松判断;*/
            volatile Node next;
            /** 此节点对应的线程 */
            volatile Thread thread;
            /** 链接到等待条件的下一个节点,或者链接到特殊值SHARED;*/
            Node nextWaiter;
    
            /** 节点在共享模式下等待,则返回true */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
            /** 返回上一个节点 */
            final Node predecessor() {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            /** 构造头部或者共享标记 */
            Node() {}
            /** 构造要添加的下一个节点 */
            Node(Node nextWaiter) {
                this.nextWaiter = nextWaiter;
                THREAD.set(this, Thread.currentThread());
            }
            /** 构造条件队列节点 */
            Node(int waitStatus) {
                WAITSTATUS.set(this, waitStatus);
                THREAD.set(this, Thread.currentThread());
            }
    
            /**  CAS设置信号量 */
            final boolean compareAndSetWaitStatus(int expect, int update) {
                return WAITSTATUS.compareAndSet(this, expect, update);
            }
            /**  CAS设置下一个节点 */
            final boolean compareAndSetNext(Node expect, Node update) {
                return NEXT.compareAndSet(this, expect, update);
            }
            final void setPrevRelaxed(Node p) {
                PREV.set(this, p);
            }
    
            // VarHandle mechanics
            /** 变量句柄:原子下操作当前节点属性;JDK9以前是用Unsafe类来加内存屏障*/
            private static final VarHandle NEXT;
            private static final VarHandle PREV;
            private static final VarHandle THREAD;
            private static final VarHandle WAITSTATUS;
            static {
                try {
                    MethodHandles.Lookup l = MethodHandles.lookup();
                    NEXT = l.findVarHandle(Node.class, "next", Node.class);
                    PREV = l.findVarHandle(Node.class, "prev", Node.class);
                    THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
                    WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
                } catch (ReflectiveOperationException e) {
                    throw new ExceptionInInitializerError(e);
                }
            }
        }
    

    Node代表同步队列和条件队列中的一个结点,它是AbstractQueuedSynchronizer的内部类。Node有很多属性,比如持有模式,等待状态,同步队列中的前继和后继,以及条件队列中的后继引用等等。可以把同步队列和条件队列看成是排队区,每个结点看成是排队区的座位,将线程看成是排队的客人。客人刚来时会先去敲敲门,看看锁有没有开,如果锁没开它就会去排队区领取一个号码牌,声明自己想要以什么样的方式来持有锁,最后再到队列的末尾进行排队。

    CLH队列

    CLH队列其实就是AQS的 head 和 tail 节点构成的一个双向队列

    CLH.png

    其入队出队方式后续具体分析。

    总结:
    其实 AQS 本质上是就是一个当前资源的状态和CLH等待队列,资源在空闲则进行分配,同时修改资源状态;资源被充分占用则添加到等待队列排队即可。其具体的互斥锁;公平锁;可重入等效果由锁的实现类对AQS锁实现的方法按需使用达到其效果,具体的实现过程后续进行分析。

    参考:

    相关文章

      网友评论

          本文标题:(六)Java并发编程之AQS

          本文链接:https://www.haomeiwen.com/subject/hjawictx.html