8.深入分析AQS源码

作者: 西部小笼包 | 来源:发表于2018-09-14 22:39 被阅读18次

    Node解析

    AQS内部定义了一个static final的内部类Node,用于实现等待队列CLH,满足FIFO结构,其队列结构如下所示:

         +------+  prev +-----+       +-----+
    head |      | <---- |     | <---- |     |  tail
         |      | ----> |     | ----> |     |
         +------+  next +-----+       +-----+
    

    队列为一个双向链表结构,保存了head、tail两个指针,分别指向链表头部、尾部。当需要添加节点时,直接在tail位置添加,而dequeue操作直接对head节点进行。Node中定义如下常量:

    /** Marker to indicate a node is waiting in shared mode */
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    static final Node EXCLUSIVE = null;
    /** waitStatus value to indicate thread has cancelled */
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    static final int CONDITION = -2;
    /**
    * waitStatus value to indicate the next acquireShared should
    * unconditionally propagate
    */
    static final int PROPAGATE = -3;
    

    以上常量分别用于设置如下属性的值:

    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
    Node类型的常量SHARED、EXCLUSIVE用于设置nextWaiter,用于表示当前节点是共享的,还是互斥的,分别用于共享锁和独占锁。int类型的常量CANCELLED、SIGNAL、CONDITION、PROPAGATE用于设置waitStatus,用于在ConditionObject中使用,可以实现await/signal模型。

    Node有三个构造函数:

    //不存放任何线程,用于生成哨兵节点
    Node() ;
    //用于锁
    Node(Thread thread, Node mode) ;
    //用于Condition
    Node(Thread thread, int waitStatus) ;
    

    AQS属性

    AQS使用内部类Node,构造一个双向链表,用作FIFO队列;除此之外,AQS还存放一个int类型的属性state,用于表示当前的同步状态。

    //链表头节点
    private transient volatile Node head;
    //链表尾节点
    private transient volatile Node tail;
    //同步状态
    private volatile int state;
    

    head节点是一个哨兵节点,不存放实际的“线程”节点(使用Node的无参构造函数)。tail指向链表的最后一个节点,当新增节点时,将新节点作为当前tail的下一个节点,通过CAS设置成功后,将新节点设为新的tail节点即可。新增节点的源码如下:
    想偷懒的可以跳过,这就是一个队列的无锁入队实现。
    enq操作是一个无限循环的操作,最终总会成功,但根据代码可知,AQS应不是starvation free的,因为某个线程可能会持续的enq失败。AQS提供了形如doAcquireNanos方法,但其超时返回false操作是在addWaiter方法(内部调用enq)之后,也无法回避enq的starvation。在此顺便说一下,AQS也是无法保证fair的,也就是说先入队列的线程不一定先获取到锁。节点的CAS是通过Unsafe来实现的,在state中统一说明。

    private Node enq(final Node node) {
        for (;;) { //死循环
            Node t = tail;
            if (t == null) { // 空链表,head、tail都为空
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    接下来的STATE 是个VOLATILE + CAS的实现。保证了内存可见性和HAPPENS BEFORE语义。防止重排序。

    private volatile int state;
    
    //读取当前state
    protected final int getState() {
        return state;
    }
    //直接写入,不考虑当前值
    protected final void setState(int newState) {
        state = newState;
    }
    //保证读-写的原子性
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    

    AQS还有一个属性static final long spinForTimeoutThreshold = 1000L;,用于表示自旋的时间,小于1000纳秒的采用自旋锁,大于1000纳秒,使用LockSupport.park方法,将线程挂起。

    重要方法分析

    acquire

    cquire是独占锁的获取锁的方法,其源码如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    acquire方法非常简单,如果tryAcquire失败(返回false),则调用acquireQueued方法,将当前线程加入到等待队列中,并中断当前线程,等待唤醒。

    tryAcquire由子类实现,下面先分析acquireQueued方法。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //若当前节点为链表第一个节点
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //park当前线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    acquireQueued在addWaiter之后,再次尝试获取锁,与tryAcquire不同的是,返回值代表的是当前线程在等待时是否可中断,通过返回interrupted,true表示可中断,false表示不可中断。通过判断当前节点是否为队列第一个节点,来决定是否获取成功,acquireQueued方法相当于提供了一个默认方法,会被子类的tryAcquire方法屏蔽掉(若tryAcquire返回true的话)。acquireQueued使用了死循环来判断当前节点前一节点是否为head,是,则获取到锁。但这个方法真的是死循环吗?其实不是的,trick就在shouldParkAfterFailedAcquire方法中,其源码如下:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    由于在锁的使用情景内,Node的waitStatus初始必定是0,因此获取锁的时候shouldParkAfterFailedAcquire方法首次进入,前一节点的waitStatus必定为0,执行compareAndSet将其设置为Node.SIGNAL(-1),再次调用shouldParkAfterFailedAcquire时,必定返回true。因此acquireQueued方法并不是死循环,而是在调用两次shouldParkAfterFailedAcquire方法之后(第一次将waitStatus设为-1,第二次返回true),执行了parkAndCheckInterrupt方法,挂起了当前线程。parkAndCheckInterrupt内调用了LockSupport.park(this),查阅API可知,park方法挂起当前线程,并在以下三种情况下立即返回。

    其他线程调用unpark,唤醒了当前线程
    其他线程调用了interrupt,中断了当前线程
    方法虚假返回(for no reason)
    在AQS中,常见的为调用unpark(其他线程执行release释放锁时)唤醒了当前线程。当前线程唤醒之后,继续调用acquireQueue中的for循环,判断是否可以获取锁。

    Release

    Mutex的unlock方法调用了release方法,在AQS中定义,源码如下:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    还是同样的配方,release方法调用子类实现的tryRelease,返回true后,表示获取成功,之后判断头节点,由于锁的实现中,waitStatus必定为0或者-1,而当一个线程lock成功后,waitStatus必定为-1,所以执行unparkSuccessor方法.

    该方法首先将head节点的waitStatus设为0,之后判断head下一节点是否为空,若不为空且waitStatus不大于0(大于0表示线程被取消),则调用LockSupport.unpark唤醒下一节点对应的线程;若为空或线程被取消,从tail节点开始遍历队列,找到队列中距离head节点最近的、未被cancel(waitStatus小于0)的节点,调用LockSupport.unpark唤醒它。源码如下:

    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    这里简单介绍一下为什么循环是从tail往前遍历,这是因为CAS操作无法对双向链表进行原子插入,在enq中,具体的插入是,先将新节点指向tail,然后CAS将tail设为新节点,因此对于pred指针的设置时原子性的,可以保证从tail向前遍历可以找到所有的节点。而next指针只是一种优化路径,方便查找,并不能保证next为null,则该节点为队列的最后一个节点。

    相关文章

      网友评论

        本文标题:8.深入分析AQS源码

        本文链接:https://www.haomeiwen.com/subject/awbagftx.html