美文网首页
AQS研究系列(三)--AbstractQueuedSynchr

AQS研究系列(三)--AbstractQueuedSynchr

作者: 爱编程的凯哥 | 来源:发表于2019-08-26 08:20 被阅读0次

    目标

    分析aqs中顶层方法实现,主要包括方法:

    1. acquire 外层获取锁方法
    2. acquireQueued 节点以自旋的方式获取同步状态,如果获取同步状态失败,要挂起线程
    3. addWaiter 将当前线程包装成node节点添加到等待队列
    4. shouldParkAfterFailedAcquire 处理当前线程是否应该挂起
    5. cancelAcquire 对于最终未成功获取到同步锁状态的情况进行移除等待队列处理

    主流程时序图

    aqs1.jpg

    分析下

    aqs顶层方法的入口为acquire,看下源码:

       public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    寻着流程图,看到首先调用的tryAcquire方法(但此方法是其具体锁实现类实现的,如我们常用的ReentrantLock类,但此方法我们先略过不管),此方法含义为当前线程去尝试获取资源,当获取失败时调用acquireQueued方法进入等待队列,相当于进入锁池,看下acquireQueued方法:

      final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
              //1.不断自旋重试或线程等待,直到结果出来
                for (;;) {
                    final Node p = node.predecessor();
              //2. 当前节点为头节点,调用tryAcquire尝试获取资源,如果成功直接返回,并把当前节点置为头节点
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                //3. 上一步未成功,判断当前阶段是否应该进入线程等待状态,如果线程进入等待,进入等待池.
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                //4. 等待被唤醒后标注此线程被中断过
                        interrupted = true;
                }
            } finally {
                if (failed)
                  //5. 不断试验后获取锁失败,进行放弃等待逻辑,将此节点从队列中移除
                    cancelAcquire(node);
            }
        }
    
    

    看下shouldParkAfterFailedAcquire方法,什么情况下满足线程等待:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            //1. 前节点状态为Node.SIGNAL,代表此节点已经标记,当他资源释放后,唤起后面的节点,所以本节点可以安全park
            if (ws == Node.SIGNAL)
                /*
                 * This node has already set status asking a release
                 * to signal it, so it can safely park.
                 */
                return true;
            if (ws > 0) {
        //2. 当状态>0,表述前节点已经放弃等待,所以递归前移,找到可用节点
                /*
                 * Predecessor was cancelled. Skip over predecessors and
                 * indicate retry.
                 */
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                /*
                 * waitStatus must be 0 or PROPAGATE.  Indicate that we
                 * need a signal, but don't park yet.  Caller will need to
                 * retry to make sure it cannot acquire before parking.
                 */
            //3. 其他情况,cas尝试将前节点状态设置为Node.SIGNAL状态,表示前节点结束后唤醒当前节点,但本次并不阻塞.
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    

    上面就是尝试获取队列,失败进行阻塞的逻辑.其中还有个addWaiter(Node.EXCLUSIVE)方法,此方法用于将当前线程包装成一个Node,并放入队列中:

       private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
              // 1. 尝试将当前节点设为尾节点,成功后,返回此节点
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
          //2. 如果设置失败,此方法进行不断自旋(不断循环到原有逻辑进行重试),尝试加入尾几点
            enq(node);
            return node;
        }
    

    对应自旋逻辑 enq(node);

     private Node enq(final Node node) {
      //不断循环自旋重试
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                  //当队列为空时,必须有个首节点,进行初始化
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                  //尝试再次设置尾节点,失败后将会循环重试,成功将返回此节点
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    好了,上面就是顶层aqs抽象类定义的方法.

    引申出tryAcquire方法:

    tryAcquire方法在顶层并未实现,具体实现是各个具体锁实现的,如ReentrantLock中通过两个内部类NonfairSync(非公平锁)和FairSync(公平锁)来实现,下面对比下这两个类的此方法:

    1. 公平锁获取方法
     protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                //1. 获取当前资源状态
                int c = getState();
              
                if (c == 0) {
                //2.通过hasQueuedPredecessors方法判断是否有其他线程排在前边等待队列 
                    if (!hasQueuedPredecessors() &&
                //3. 如果当前此线程排在最前边,则将当前队列状态值赋值, acquires在独占锁中一直是1,所以就是0到1点变化
                        compareAndSetState(0, acquires)) {
              //4.如果状态修改成功,则将当前资源拥有者改成当前线程,返回true
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
              //5. 上一步判断c!=0代表已有线程拥有资源,判断当前线程是否为此线程
                else if (current == getExclusiveOwnerThread()) {
              //6.如果是当前线程获取的资源,此处重新获取锁,因为为可重入锁,所以锁次数加1
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    上面的分析,看出了公平锁的实现,其中还有个重点,hasQueuedPredecessors方法是怎么判断是否有其他队列排在前边:

        public final boolean hasQueuedPredecessors() {
            // The correctness of this depends on head being initialized
            // before tail and on head.next being accurate if the current
            // thread is first in queue.
            
            Node t = tail; // Read fields in reverse initialization order
            Node h = head;
            Node s;
          //1. 此方法需要返回false才代表没有排在前边的线程等待
          //2. 如果h!=t表示等待队列有其他节点存在,否则h==t表示只有一个节点,直接返回false,表示当前线程可以获取资源
          //3. 如果h!=t,则表示队列中有线程在使用资源,继续判断(s = h.next) == null表示第二个节点为null,说明h的下一个节点异常,可能是在其他线程处理线程队列过程中,直接返回true
          //4.最后一个判断s.thread != Thread.currentThread(),在上面所有可能后,判断s线程不是当前线程,则说明已有其他线程索取到锁了
            return h != t &&
                ((s = h.next) == null || s.thread != Thread.currentThread());
        }
    
    

    2. 好了,公平锁完了,下面看下非公平锁怎么实现:

     final boolean nonfairTryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
            //.非公平锁,不用判断队列中是否有其他节点,直接尝试修改资源状态,如果成功,代表获取到资源,直接返回
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    我们现在非公平锁实现和公平锁逻辑类似,只是少了一个hasQueuedPredecessors方法,不在去判断是否前面有线程排队而已.

    引申出参数(int acquires)

    在独占锁中, acquires一直为1,表示占有锁到永远只有一个,看下ReentrantLock的写法:

      public boolean tryLock() {
            return sync.nonfairTryAcquire(1);
        }
    
    

    而在CountDownLatch等共享锁到实现中,此值也是1,但其实现类中,通过构造器添加共享锁中的资源可用数.

    
       public CountDownLatch(int count) {
            if (count < 0) throw new IllegalArgumentException("count < 0");
            this.sync = new Sync(count);
        }
    
                    ...........................................
            //初始设值,选择共享锁容量
       private static final class Sync extends AbstractQueuedSynchronizer {
            private static final long serialVersionUID = 4982264981922014374L;
    
            Sync(int count) {
                setState(count);
            }
    
            int getCount() {
                return getState();
            }
    
            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    

    好了,上面就分析完了aqs实现的重点逻辑.

    AQS研究系列(一)--Unsafe使用
    AQS研究系列(二)--线程状态和interrupt()、interrupted()、isInterrupted等方法学习

    相关文章

      网友评论

          本文标题:AQS研究系列(三)--AbstractQueuedSynchr

          本文链接:https://www.haomeiwen.com/subject/eptsjctx.html