美文网首页
AQS(AbstractQueuedSynchronizer)源

AQS(AbstractQueuedSynchronizer)源

作者: 一点点简单 | 来源:发表于2018-04-06 11:56 被阅读191次

    前言

    java.util.concurrent包(之后简称JUC包)中,提供了大量的同步与并发的工具类,是多线程编程的“利器”。其中locks包下,包含了多种锁,如ReentrantLock独占锁、ReentrantReadWriteLock读写锁、Semaphore信号量(共享锁)等,而这些锁有一个共同的基础类:AbstractQueuedSynchronizer。

    原文地址:http://www.wangjialong.cc/2018/04/06/aqs_info/#more

    AQS简介

    AQS是一个抽象类,不可以被实例化,它的设计之初就是为了让子类通过继承来实现多样的功能的。它内部提供了一个FIFO的等待队列,用于多个线程等待一个事件(锁)。它有一个重要的状态标志——state,该属性是一个int值,表示对象的当前状态(如0表示lock,1表示unlock)。AQS提供了三个protected final的方法来改变state的值,分别是:getState、setState(int)、compareAndSetState(int, int)。根据修饰符,它们是不可以被子类重写的,但可以在子类中进行调用,这也就意味着子类可以根据自己的逻辑来决定如何使用state值。

    / 同一类中 同一包中 同包内的子类 不同包内的子类 全局
    public + + + + +
    protected + + + +
    default(无修饰符) + + +
    private +

    java的修饰符作用域如下:

    / 同一类中 同一包中 同包内的子类 不同包内的子类 全局
    public + + + + +
    protected + + + +
    default(无修饰符) + + +
    private +

    表中 + 表示可以访问, 空白表示无法访问

    AQS的子类应当被定义为内部类,作为内部的helper对象。事实上,这也是juc种锁的做法,如ReentrantLock,便是通过内部的Sync对象来继承AQS的。AQS中定义了一些未实现的方法(抛出UnsupportedOperationException异常)

    • tryAcquire(int) 尝试获取state
    • tryRelease(int) 尝试释放state
    • tryAcquireShared(int) 共享的方式尝试获取
    • tryReleaseShared(int) 共享的方式尝试释放
    • isHeldExclusively() 判断当前是否为独占锁

    这些方法是子类需要实现的,可以选择实现其中的一部分。根据实现方式的不同,可以分为两种:独占锁和共享锁。其中JUC中锁的分类为:

    • 独占锁:ReentrantLock、ReentrantReadWriteLock.WriteLock
    • 共享锁:ReentrantReadWriteLock.ReadLock、CountDownLatch、CyclicBarrier、Semaphore

    其实现方式为:

    • 独占锁实现的是tryAcquire(int)、tryRelease(int)
    • 共享锁实现的是tryAcquireShared(int)、tryReleaseShared(int)

    如独占锁的实现方式是:

    Acquire:
         while (!tryAcquire(arg)) {
            //将当前线程加入FIFO队列中;
            //自旋或阻塞当前线程;
         }
    
    Release:
         if (tryRelease(arg))
            //唤醒队列中的第一个线程(head);
    

    AQS中还提供了一个内部类ConditionObject,它实现了Condition接口,可以用于await/signal。采用CLH队列的算法,唤醒当前线程的下一个节点对应的线程,而signalAll唤醒所有线程。

    总的来说,AQS提供了三个功能:

    1. 实现独占锁
    2. 实现共享锁
    3. 实现Condition模型

    源码解析

    Node解析

    AQS内部定义了一个static final的内部类Node,用于实现等待队列CLH,满足FIFO结构,其队列结构如下所示:

    <pre>
    +------+ prev +-----+ +-----+
    head | | <---- | | <---- | | tail
    | | ----> | | ----> | |
    +------+ next +-----+ +-----+
    </pre>

    队列为一个双向链表结构,保存了head、tail两个指针,分别指向链表头部、尾部。当需要添加节点时,直接在tail位置添加,而dequeue操作直接对head节点进行。Node中定义如下常量:

    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;
    
    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    
    /**
    * waitStatus value to indicate the next acquireShared should
    * unconditionally propagate
    */
    static final int PROPAGATE = -3;
    

    以上常量分别用于设置如下属性的值:

    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
    

    Node类型的常量SHARED、EXCLUSIVE用于设置nextWaiter,用于表示当前节点是共享的,还是互斥的,分别用于共享锁和独占锁。int类型的常量CANCELLED、SIGNAL、CONDITION、PROPAGATE用于设置waitStatus,用于在ConditionObject中使用,可以实现await/signal模型。

    Node有三个构造函数:

    //不存放任何线程,用于生成哨兵节点
    Node() ;
    //用于锁
    Node(Thread thread, Node mode) ;
    //用于Condition
    Node(Thread thread, int waitStatus) ;
    

    AQS属性

    AQS使用内部类Node,构造一个双向链表,用作FIFO队列;除此之外,AQS还存放一个int类型的属性state,用于表示当前的同步状态。

    //链表头节点
    private transient volatile Node head;
    //链表尾节点
    private transient volatile Node tail;
    //同步状态
    private volatile int state;
    

    head节点是一个哨兵节点,不存放实际的“线程”节点(使用Node的无参构造函数)。tail指向链表的最后一个节点,当新增节点时,将新节点作为当前tail的下一个节点,通过CAS设置成功后,将新节点设为新的tail节点即可。新增节点的源码如下:

        private Node enq(final Node node) {
            for (;;) { //死循环
                Node t = tail;
                if (t == null) { // 空链表,head、tail都为空
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    enq操作是一个无限循环的操作,最终总会成功,但根据代码可知,AQS应不是starvation free的,因为某个线程可能会持续的enq失败。AQS提供了形如doAcquireNanos方法,但其超时返回false操作是在addWaiter方法(内部调用enq)之后,也无法回避enq的starvation。在此顺便说一下,AQS也是无法保证fair的,也就是说先入队列的线程不一定先获取到锁。节点的CAS是通过Unsafe来实现的,在state中统一说明。

    state表示AQS当前的同步状态,如0表示lock,1表示unlock状态。对state的操作,提供了三个方法。

        //读取当前state
        protected final int getState() {
            return state;
        }
    
        //直接写入,不考虑当前值
        protected final void setState(int newState) {
            state = newState;
        }
    
        //保证读-写的原子性
        protected final boolean compareAndSetState(int expect, int update) {
            // See below for intrinsics setup to support this
            return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
        }
    

    可以看到compareAndSetState使用的是unsafe对象的compareAndSwapInt方法,传入this指针,state属性的偏移地址,期待值expect,更新值update,可以实现CAS操作。state属性的偏移地址获取方式如下:

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    static {
        try {
            stateOffset = unsafe.objectFieldOffset
                            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        } catch (Exception ex) { throw new Error(ex); }
    }
    

    实际上,AQS的head、tail节点,内部类Node的waitStatus、next属性均使用unsafe对象,通过偏移地址来进行CAS操作。Unsafe是sun.misc包下的类,在Java API中没有官方文档,因为它是用于实现Java库的,Java中有一个功能类似的类,可以实现对象属性的CAS操作,可以参考我的另一篇博客AtomicXFieldUpdater,属性原子修改的外部工具类,关于Unsafe的使用,可以参考Guide to Unsafe

    AQS还有一个属性static final long spinForTimeoutThreshold = 1000L;,用于表示自旋的时间,小于1000纳秒的采用自旋锁,大于1000纳秒,使用LockSupport.park方法,将线程挂起。

    重要方法分析

    AQS是用于实现独占锁或共享锁的,对于一个锁来说,最重要的就是lock和unlock操作了,对应到AQS中,为acquire、release方法,由于AQS需要和子类进行“合作”,因此需要子类的定义来进行联合分析,为简单起见,使用AQS官方文档中的示例,定义独占锁如下:

    class Mutex implements Lock, java.io.Serializable {
    
       // Our internal helper class
       private static class Sync extends AbstractQueuedSynchronizer {
         // Reports whether in locked state
         protected boolean isHeldExclusively() {
           return getState() == 1;
         }
    
         // Acquires the lock if state is zero
         public boolean tryAcquire(int acquires) {
           assert acquires == 1; // Otherwise unused
           if (compareAndSetState(0, 1)) {
             setExclusiveOwnerThread(Thread.currentThread());
             return true;
           }
           return false;
         }
    
         // Releases the lock by setting state to zero
         protected boolean tryRelease(int releases) {
           assert releases == 1; // Otherwise unused
           if (getState() == 0) throw new IllegalMonitorStateException();
           setExclusiveOwnerThread(null);
           setState(0);
           return true;
         }
    
         // Provides a Condition
         Condition newCondition() { return new ConditionObject(); }
    
         // Deserializes properly
         private void readObject(ObjectInputStream s)
             throws IOException, ClassNotFoundException {
           s.defaultReadObject();
           setState(0); // reset to unlocked state
         }
       }
    
       // The sync object does all the hard work. We just forward to it.
       private final Sync sync = new Sync();
    
       public void lock()                { sync.acquire(1); }
       public boolean tryLock()          { return sync.tryAcquire(1); }
       public void unlock()              { sync.release(1); }
       public Condition newCondition()   { return sync.newCondition(); }
       public boolean isLocked()         { return sync.isHeldExclusively(); }
       public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
       public void lockInterruptibly() throws InterruptedException {
         sync.acquireInterruptibly(1);
       }
       public boolean tryLock(long timeout, TimeUnit unit)
           throws InterruptedException {
         return sync.tryAcquireNanos(1, unit.toNanos(timeout));
       }
     }
    

    可以看到,lock方法调用内部类的acquire方法,也就是AQS的acquire方法。unlock方法调用release方法。
    下面对两个流程进行分析

    acquire

    acquire是独占锁的获取锁的方法,其源码如下:

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    acquire方法非常简单,如果tryAcquire失败(返回false),则调用acquireQueued方法,将当前线程加入到等待队列中,并中断当前线程,等待唤醒。

    tryAcquire由子类实现,下面先分析acquireQueued方法。

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    //若当前节点为链表第一个节点
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    //park当前线程
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    acquireQueued在addWaiter之后,再次尝试获取锁,与tryAcquire不同的是,返回true表示未获取成功,false表示获取成功。通过判断当前节点是否为队列第一个节点,来决定是否获取成功,acquireQueued方法相当于提供了一个默认方法,会被子类的tryAcquire方法屏蔽掉(若tryAcquire返回true的话)。

    tryAcquire会调用子类Mutex.Sync的实现,其代码如下:

        // 如果state为0,则获取到锁
        public boolean tryAcquire(int acquires) {
           assert acquires == 1; // Otherwise unused
           if (compareAndSetState(0, 1)) {
             setExclusiveOwnerThread(Thread.currentThread());
             return true;
           }
           return false;
        }
    

    由此可见,AQS提供了一个模板,子类需要实现其tryAcquire方法,实现具体的获取锁逻辑(通过对state的读、写),子类lock方法直接调用AQS的acquire方法即可。

    release方法

    Mutex的unlock方法调用了release方法,在AQS中定义,源码如下:

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    还是同样的配方,release方法调用子类实现的tryRelease,返回true后,表示获取成功,之后判断头节点,由于锁的实现中,waitStatus必定为0,所以不会执行unpark操作,unpark用于Condition模型中。tryRelease方法的源码如下:

        // 将state设置为0,解锁
        protected boolean tryRelease(int releases) {
           assert releases == 1; // Otherwise unused
           if (getState() == 0) throw new IllegalMonitorStateException();
           setExclusiveOwnerThread(null);
           setState(0);
           return true;
         }
    

    由源码可知,tryRelease只需要将state设置为0即可,因为调用unlock方法的必定是之前调用lock成功的,因此当前state必定为1,为安全起见,使用getState判断是否为0,若为0,说明线程出错。state设置时,不需要调用CAS方法,只需要setState即可,保证write对于其他线程可见即可(通过volatile内存屏障保证)。

    总结

    AQS提供了一个框架,在其上可以构建丰富的线程同步工具类,JUC包中ReadWriteLock、CountDownLatch都是基于AQS实现的,AQS在JUC包中的地位相当重要。其类图如下:

    image

    盗图使用,详见“JUC锁”01之 框架

    AQS提供了三大功能:独占锁、共享锁、ConditionObject。子类在实现中,可以实现其一部分方法。其编程思想值得借鉴,通过超类实现基本的处理流程,将其中部分抽成未实现方法,默认抛出异常,由子类实现,这种解耦方式,最大化的减少了代码的重复,且便于子类在实现中个性化自己的处理逻辑。

    很久没写博客了,准备以AQS入手,深入分析一下JUC包,flag就这么立起来了,希望可以实现吧~~

    参考资料

    相关文章

      网友评论

          本文标题:AQS(AbstractQueuedSynchronizer)源

          本文链接:https://www.haomeiwen.com/subject/fczjhftx.html