美文网首页
AQS源码浅析

AQS源码浅析

作者: zZeroZz | 来源:发表于2020-05-11 11:05 被阅读0次

简述

记录下自己的学习过程

抽象方法(模板模式)

子类需要按照自己的需求,去重写一些抽象方法;

  • tryAcquire
  • tryRelease
  • tryAcquireShared
  • tryReleaseShared
  • isHeldExclusively

CLH队列

  • 是一个虚拟的队列,没有实际的数组,主要是通过节点之间的指针联系
  • 当获取不到锁的时候,进入CLH,等待满足获取资源成功;
  • 获取成功的,成为头节
  • 释放锁成功的时候,再唤醒其最近的非取消态的后继节点,其再自旋尝试竞争锁,成功成为新的头节点,并移除原头节点及被取消的节点。

入队方法

入队主要从队尾

private Node enq(final Node node) {
    // CAS自旋
    for (;;) {
        Node t = tail;
        // 判断队列是否为空
        if (t == null) { // Must initialize
            // 根据headOffset偏移量CAS设置头节点
            if (compareAndSetHead(new Node()))
                // 尾节点指向头节点
                tail = head;
                // 此时node没有放入队尾,只有等下一次循环
        } else {
            // 当前节点前指针指向原尾节点
            node.prev = t;
             // 根据tailOffset偏移量CAS设置头节点
            if (compareAndSetTail(t, node)) {
                // 原尾节点后指针指向新节点
                t.next = node;
                return t;
            }
        }
    }
}

新线程入队

主要就是新建一个节点,然后放到队尾

 /**
 * Creates and enqueues node for current thread and given mode.
 *
 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 独占锁,共享锁
 * @return the new node
 */
private Node addWaiter(Node mode) {
    // 将当前线程放入新节点,然后nextWaiter=mode
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        // 新节点的前指针指向原尾节点,新节点成为新的尾节点
        node.prev = pred;
        // 根据tailOffset偏移量CAS设置尾节点
        if (compareAndSetTail(pred, node)) {
            // 原尾节点的后指针指向新节点
            pred.next = node;
            return node;
        }
    }
    // CAS自旋,放入CLH队列
    enq(node);
    return node;
}

获取独占锁

  • acquire
    1.首先会尝试tryAcquire获取锁,如果返回true,则能获取到,!tryAcquire(arg) &&短路与,则直接执行业务代码;
    2.如果获取不到,则执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    3.如果acquireQueued成功,则执行selfInterrupt()
    4.此时,只有头节点是park的,其他的都是unpark的,且除了tail指向的尾节点,其他的waitStatus都是SIGNAL。
/**
 * Acquires in exclusive mode, ignoring interrupts.  Implemented
 * by invoking at least once {@link #tryAcquire},
 * returning on success.  Otherwise the thread is queued, possibly
 * repeatedly blocking and unblocking, invoking {@link
 * #tryAcquire} until success.  This method can be used
 * to implement method {@link Lock#lock}.
 *
 * @param arg the acquire argument.  This value is conveyed to
 *        {@link #tryAcquire} but is otherwise uninterpreted and
 *        can represent anything you like.
 */
public final void acquire(int arg) {
    //  调用子类实现的获取锁方法,成功则获取锁成功。
    if (!tryAcquire(arg) &&
        // 失败,则入队,且自旋获取锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
       
        // 重新设置线程中断状态
        selfInterrupt();
}
  • tryAcquire
    AQS抽象类,自己不实现此方法,交由相应的子类去自行实现
protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

1.ReentrantLock.FairSync.tryAcquire
2.ReentrantLock.NonfairSync.tryAcquire
3.ReentrantReadWriteLock.Sync.tryAcquire
4.ThreadPoolExecutor.Worker.tryAcquire

  • acquireQueued

1.ReentrantLock.FairSync.tryAcquire
2.ReentrantLock.NonfairSync.tryAcquire
3.ReentrantReadWriteLock.Sync.tryAcquire
4.ThreadPoolExecutor.Worker.tryAcquire

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        
        // 自旋
        for (;;) {
            // CLH采用前一个节点来标记状态,可能是考虑到当前结点可以取消之类的因素
            final Node p = node.predecessor();
            
            // 如果前一个节点是头结点,则当前节点才能尝试获取锁
            if (p == head && tryAcquire(arg)) {
            
                // 如果获取成功,则设置当前节点为头节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            
            // 如果未获取到锁,则需要判断是否要挂起当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
            
                // 如果需要挂起线程,则调用LockSuport.park挂起线程
                parkAndCheckInterrupt())
                // 如果走到这一步,代表调用了unpark,并且线程中断状态被清除为false,此处应该加个标记,让acquireQueue调用selfInterrupt设置true回去
                interrupted = true;
        }
    } finally {
    
        // 如果因为其他原因导失败,则需要取消当前节点
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire

确定是否需要挂起当前节点

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    
    // 如果前一个节点状态为SIGNAL,则代表当前节点已经被挂起了,则直接返回
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
        
    // 如果前一个节点已经被取消,则需要往前找一个非取消的节点,并移除已经取消的那些节点
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
        
    // 将前一个节点设置为SIGNAL
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    
    // 返回false,表明当前线程不可以挂起,需要继续自旋
    return false;
}
/**
 * Convenience method to interrupt current thread.
 */
static void selfInterrupt() {
    // 给当前线程设置中断标志
    Thread.currentThread().interrupt();
}

/**
 * Convenience method to park and then check if interrupted
 *
 * @return {@code true} if interrupted
 */
private final boolean parkAndCheckInterrupt() {
    // 挂起当前线程
    LockSupport.park(this);
    
    // 阻塞处,等待unpark,返回线程状态,并且清除中断状态
    return Thread.interrupted();
}

release

释放锁

public final boolean release(int arg) {

    // 调用子类释放锁,成功则唤醒后继节点
    if (tryRelease(arg)) {
        Node h = head;
        
        /*
         * 1.如果头节点为空,则代表没有线程竞争
         * 2.如果头节点非空,且waitStatus为初始状态0,则不需要唤醒,
         *   因为后继节点在shouldParkAfterFailedAcquire里面会二次重试,不会被挂起
         */
        if (h != null && h.waitStatus != 0)
            // 唤醒后继节点,acquireQueued继续执行。
            unparkSuccessor(h);
        return true;
    }
    return false;
}

unparkSuccessor

释放锁后,此时后继的都是park的,只有头节点是unpark,唤醒后继非取消的节点,来竞争锁。

private void unparkSuccessor(Node node) {
   /*
    * If status is negative (i.e., possibly needing signal) try
    * to clear in anticipation of signalling.  It is OK if this
    * fails or if status is changed by waiting thread.
    */
   int ws = node.waitStatus;
   if (ws < 0)
       compareAndSetWaitStatus(node, ws, 0);

   /*
    * Thread to unpark is held in successor, which is normally
    * just the next node.  But if cancelled or apparently null,
    * traverse backwards from tail to find the actual
    * non-cancelled successor.
    */
   // 唤醒头节点的后继节点
   Node s = node.next;
   
   // enq或者addWaiter过程中的时候,旧的next未指向新节点:s==null
   // 移除cancel节点的过程中,中间节点指向被移除的cancel节点:s.waitStatus
   if (s == null || s.waitStatus > 0) {
       s = null;
       
       // 找出离node最近的那一个未取消的后继节点
       for (Node t = tail; t != null && t != node; t = t.prev)
           if (t.waitStatus <= 0)
               s = t;
   }

   // 唤醒阻塞的线程
   if (s != null)
       LockSupport.unpark(s.thread);
}

相关文章

网友评论

      本文标题:AQS源码浅析

      本文链接:https://www.haomeiwen.com/subject/vfnynhtx.html