美文网首页
AQS源码分析总结

AQS源码分析总结

作者: 因你而在_caiyq | 来源:发表于2021-03-03 08:50 被阅读0次

原创文章,转载请注明原文章地址,谢谢!

可重入锁

同一个线程在外层获取锁的时候,再进入该线程的内层方法会自动获取锁,前提是锁对象是同一个对象,不会因为之前已经获取过还没释放而阻塞。

Java中ReentrantLock和synchronized都是可重入锁,可重入锁的一个优点是可一定程度避免死锁。

可重入锁种类
  • 隐式锁(synchronized)
  • 显式锁(Lock),ReentrantLock
synchronized重入锁的实现机制

每个锁对象拥有一个锁计数器和一个指向持有该锁的线程的指针。当执行monitorenter时,如果目标锁对象的计数器为零,那么说明它没有被其他线程所持有,Java虚拟机会将该锁对象的持有线程设置为当前线程,并且将其计数器加1。在目标锁对象的计数器不为零的情况下,如果锁对象的持有线程是当前线程,那么Java虚拟机可以将其计数器加1,否则需要等待,直至持有线程释放该锁。当执行monitorexit时,Java虚拟机则需将锁对象的计数器减1,计数器为零代表锁已被释放。

LockSupport

  • LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
  • LockSupport类使用了一种名为Permit许可的概念来做到阻塞和唤醒线程的功能,每个线程都有一个许可permit,permit只有两个值0和1,默认是0。
  • 可以把许可看成是一种(0,1)信号量Semaphore,但与Semaphore不同的是,许可的累加上限是1。
  • LockSupport是一个线程阻塞的工具类,所有的方法都是静态方法,可以让线程在任意位置阻塞,阻塞之后也有对应的唤醒方法。归根结底,LockSupport调用的Unsafe中的native代码。
LockSupport的主要方法
  • 阻塞当前线程/阻塞传入的具体线程,park/park(Object blocker)
public static void park() {
    UNSAFE.park(false, 0L);
}

permit默认是0,所以一开始调用park()方法,当前线程就会阻塞,直到别的线程将当前线程的permit设置为1时,park方法会被唤醒,然后会将permit再次设置为0并返回。

  • 唤醒处于阻塞状态的指定线程,unpark(Thread thread)
public static void unpark(Thread thread) {
    if (thread != null)
        UNSAFE.unpark(thread);
}

调用unpark(thread)方法后,就会将thread线程的许可permit设置为1(注意多次调用unpark方法,不会累加,permit值还是1),会自动唤醒thread线程,即之前阻塞中的LockSupport.park()方法会立即返回。

LockSupport提供park()和unpark()方法实现阻塞线程和解除线程阻塞的过程
  • LockSupport和每个使用它的线程都有一个许可permit关联。
  • permit相当于1,0的开关,默认是0,调用一次unpark就加1,调用一次park会消费permit,变成0,同时park立即返回。
  • 如再次调用park会变成阻塞(因为permit为0了会阻塞在这里,一直到permit变为1),这时调用unpark会把permit置为1。
  • 每个线程都有一个相关的permit,permit最多只有一个,重复调用unpark也不会累积凭证。
问题
  • 为什么可以先唤醒线程后阻塞线程?
    因为unpark获得了一个凭证,之后再调用park方法,就可以名正言顺的凭证消费,故不会阻塞。
  • 为什么唤醒两次后阻塞两次,但最终结果还是会阻塞线程?
    因为凭证的数量最多是1,连续调用两次unpark和调用一次unpark效果一样,只会增加一个凭证,而调用两次park却需要消费两个凭证,凭证不够,不能放行。

AQS

AQS是用来构建锁或其他同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类型变量表示持有锁的状态。
AQS加锁会导致阻塞
抢到资源的线程直接使用处理业务逻辑,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待,但是等候线程仍然保留获取锁的可能且获取锁流程仍在继续。既然是排队等候机制,那么一定会是某种队列,这样的队列是什么数据结构呢?如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的节点Node,通过CAS、自旋以及LockSupport.park()的方式,维护state变量的状态,使并发达到同步的控制效果。
AQS使用一个volatile的int类型的成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取的排队工作将每条要去抢占资源的线程封装成一个Node节点来实现锁的分配,通过CAS完成对state值的修改。
AQS内部体系架构

小结:AQS=state变量+CLH变种的双端队列

Node
static final class Node {
    /**
     * Marker to indicate a node is waiting in shared mode
     */
    static final Node SHARED = new Node();
    /**
     * Marker to indicate a node is waiting in exclusive mode
     */
    static final Node EXCLUSIVE = null;

    /**
     * waitStatus value to indicate thread has cancelled
     */
    static final int CANCELLED = 1;
    /**
     * waitStatus value to indicate successor's thread needs unparking
     */
    static final int SIGNAL = -1;
    /**
     * waitStatus value to indicate thread is waiting on condition
     */
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    static final int PROPAGATE = -3;

    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    /**
     * The thread that enqueued this node.  Initialized on
     * construction and nulled out after use.
     */
    volatile Thread thread;

    //省略......
}

小结:Node=waitStatus+前后指针指向

AQS同步队列的基本结构

AQS源码分析

公平锁与非公平锁
public static void main(String[] args) {
    ReentrantLock reentrantLock = new ReentrantLock();
    reentrantLock.lock();
    reentrantLock.unlock();
}
/**
 * Creates an instance of {@code ReentrantLock}.
 * This is equivalent to using {@code ReentrantLock(false)}.
 */
public ReentrantLock() {
    sync = new NonfairSync();
}

/**
 * Creates an instance of {@code ReentrantLock} with the
 * given fairness policy.
 *
 * @param fair {@code true} if this lock should use a fair ordering policy
 */
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

ReentrantLock默认是非公平锁,创建ReentrantLock对象,可以传入一个boolean的值,默认是false。

//公平锁
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}
//非公平锁
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

对比公平锁和非公平锁的tryAcquire()方法的实现,其差别就在于非公平锁获取锁时比公平锁中少了一个判断!hasQueuedPredecessors(),hasQueuedPredecessors()中判断了是否需要排队,导致公平锁和非公平锁的差异如下

  • 公平锁:公平锁将就先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中。
  • 非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象,也就是说队列的第一个排队线程在unpark(),之后还是需要竞争锁。
案例构建

三位用户去银行办理业务,分别是ThreadA,ThreadB,ThreadC,而银行只有一个窗口,每次最多只能有一位用户办理业务。如果一开始是ThreadA获取到锁,并开始办理业务,那么ThreadB和ThreadC就只能排队等待,当然了,B和C也会继续尝试去获取锁。前面我们说到,AQS通俗点说就是一个CLH队列和一个state状态。

  • 假设最初A先获取到锁,初始化时,AQS的state为0,队列为空。
final void lock() {
    //这里的compareAndSetState(0, 1)中0,就是state变量,是期望值,1就是要更新的值,通过CAS更新当前state值
    //因为当前state是0,所以这里的compareAndSetState(0, 1)是可以设置成功的
    if (compareAndSetState(0, 1))
        //设置独占线程,前面假设A用户先获取到锁,所以这里当前线程是ThreadA。
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
  • 此时state已经设置为1,队列中依然为空。A正在办理业务,B和C尝试获取锁,假设B尝试获取锁。
final void lock() {
    //因为当前state是1,与期望值0不符,所以这里的compareAndSetState(0, 1)是无法设置成功
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //所以线程B会走此行逻辑
        acquire(1);
}
public final void acquire(int arg) {
    //先尝试获取锁
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • tryAcquire(arg)方法
//该方法直接抛出异常,意思是必须由子类去重写,否则直接抛出异常
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}
//在重写方法的时候,是走非公平锁逻辑
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
    //当前线程是ThreadB
    final Thread current = Thread.currentThread();
    //此时state是1
    int c = getState();
    //1不等于0,不走此逻辑
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //当前线程是B,而独占的线程是A(A正在办理业务),不走此逻辑
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //直接返回false
    return false;
}
public final void acquire(int arg) {
    //tryAcquire(arg)方法返回false,那么!tryAcquire(arg)就为true
    if (!tryAcquire(arg) &&
            //接下来进入addWaiter(Node.EXCLUSIVE)方法
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • addWaiter(Node.EXCLUSIVE)方法
private Node addWaiter(Node mode) {
    //mode是入参Node.EXCLUSIVE,该参数是null,当前线程是B,此处创建了节点B
    Node node = new Node(Thread.currentThread(), mode);
    // 此时队列为空,所以tail尾节点为空
    Node pred = tail;
    //判断为false,不走此逻辑
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //执行enq方法
    enq(node);
    return node;
}
  • enq(node)方法
private Node enq(final Node node) {
    //自旋
    for (;;) {
        //第一次进来,此处的tail节点为空
        Node t = tail;
        if (t == null) { // Must initialize
            //此时队列中没有节点,那么就创建一个空的节点,也称哨兵节点,利用CAS将当前节点设置为head头节点
            //新建的node,包含head,tail,waitStatus,thread,其作用就是占位
            if (compareAndSetHead(new Node()))
                //头节点赋值给尾节点,此时头节点和尾节点都是哨兵节点
                tail = head;
        } else {
            //第二次进来,哨兵节点为head,B节点为tail
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                //设置完成,队列中head为哨兵节点,tail为B节点
                return t;
            }
        }
    }
}
  • 到目前,state为1,队列中head为哨兵节点,tail为B节点。A正在办理业务,假设C尝试获取锁。
//C依然走tryAcquire方法到此,和B一样
final boolean nonfairTryAcquire(int acquires) {
    //当前线程是ThreadC
    final Thread current = Thread.currentThread();
    //此时state是1
    int c = getState();
    //1不等于0,不走此逻辑
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //当前线程是C,而独占的线程是A(A正在办理业务),不走此逻辑
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    //直接返回false
    return false;
}
private Node addWaiter(Node mode) {
    //mode是入参Node.EXCLUSIVE,该参数是null,当前线程是C,此处创建了节点C
    Node node = new Node(Thread.currentThread(), mode);
    // 此时队列中head为哨兵节点,tail为B节点
    Node pred = tail;
    //判断为true
    if (pred != null) {
        //node为C,C的前指针为pred,pred为tail,就是B节点,此时C节点为tail
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            //返回节点C
            return node;
        }
    }
    enq(node);
    return node;
}
  • 到此,state为1,队列中head为哨兵节点,然后是B节点,tail为C节点。A还是正在办理业务。
  • acquireQueued(addWaiter(Node.EXCLUSIVE), arg)方法
//队列中入队的时候分别是B和C,现在node节点是B节点,arg是1
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋
        for (;;) {
            //predecessor()方法返回node的前一个节点,即为哨兵节点
            final Node p = node.predecessor();
            //p是哨兵节点,是head。B通过tryAcquire(arg)再尝试获取锁,失败,返回false
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //B在抢占失败之后进行park,p为哨兵节点,node为B节点。
            //第一次shouldParkAfterFailedAcquire(p, node)返回false,哨兵节点的waitStatus设置为-1
            //第二次返回true
            if (shouldParkAfterFailedAcquire(p, node) &&
                    //此时B节点在此挂起中断,真正意义上入队
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  • shouldParkAfterFailedAcquire(p, node)方法中,如果前驱节点的waitStatus是SIGNAL状态,即shouldParkAfterFailedAcquire方法会返回true,程序会继续向下执行parkAndCheckInterrupt方法,用于将当前线程挂起。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
  • parkAndCheckInterrupt()方法
//检查当前线程中断
private final boolean parkAndCheckInterrupt() {
    //this是当前节点,在此处挂起,需要unpark()才可放行
    LockSupport.park(this);
    return Thread.interrupted();
}
  • C节点会和B节点一样,执行上述逻辑,在park()处挂起,等待unpark()唤醒。
  • 到目前,state为1,A占用窗口办理业务,对类中B和C排队。
  • 接着就是unlock()的过程,A占用当前窗口办理业务,办好了即将离开。执行unlock()方法。
//加锁一次,就要解锁一次,此处arg为1
public final boolean release(int arg) {
    //解锁
    if (tryRelease(arg)) {
        //h是哨兵节点,其waitStatus为-1
        Node h = head;
        if (h != null && h.waitStatus != 0)
            //唤醒
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  • tryRelease(arg)方法
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}
protected final boolean tryRelease(int releases) {
    //getState()获取到当前state为1,releases参数为1,所以c为0
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        //设置当前独占线程为null,即窗口不再由A占用
        setExclusiveOwnerThread(null);
    }
    //设置当前state为0
    setState(c);
    return free;
}
  • unparkSuccessor(h)方法
private void unparkSuccessor(Node node) {
    //此处node是哨兵节点,其waitStatus为-1
    int ws = node.waitStatus;
    if (ws < 0)
        //利用CAS将当前节点waitStatus设置为0
        compareAndSetWaitStatus(node, ws, 0);
    //s节点就是B节点,其waitStatus为0
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        //唤醒B节点
        LockSupport.unpark(s.thread);
}
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        //自旋
        for (;;) {
            //predecessor()方法返回node的前一个节点,即为哨兵节点
            final Node p = node.predecessor();
            //p是哨兵节点,是head。B再次通过tryAcquire(arg)再尝试获取锁,成功,返回true,state被设置为1,占用者是B
            if (p == head && tryAcquire(arg)) {
                //将原本为B节点设置为head,变为新的哨兵节点
                setHead(node);
                //回收原本的哨兵节点
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  • 自此,state为1,队列中head是新的哨兵节点,tail是C节点。B节点占用窗口办理业务。
  • 接着队列中的C节点,也会和上述B节点一样,以同样的方式出队列。
总结

其实关于AQS的源码分析,网上有很多文章,而且讲的很详细,图文并茂,但是只要是适合自己的,自己看完可以理解AQS,以及AQS相关的实现,都是可以学习和借鉴的。本文着重了通过案例进行了源码流程的分析,希望这篇文章,能对学习AQS的人有所帮助,文中如有疑问,或者有理解不当的地方,还请留言指出,一同进步,谢谢!

博客内容仅供自已学习以及学习过程的记录,如有侵权,请联系我删除,谢谢!

相关文章

网友评论

      本文标题:AQS源码分析总结

      本文链接:https://www.haomeiwen.com/subject/yjptqltx.html