美文网首页Java整理JUCjava
深入浅出java同步器AQS

深入浅出java同步器AQS

作者: 美团Java | 来源:发表于2016-07-11 16:37 被阅读21205次

简书 占小狼
转载请注明原创出处,谢谢!

前言

在java.util.concurrent.locks包中有很多Lock的实现类,常用的有ReentrantLock、ReadWriteLock(实现类ReentrantReadWriteLock),内部实现都依赖AbstractQueuedSynchronizer类,接下去让我们看看Doug Lea大神是如何使用一个普通类就完成了代码块的并发访问控制。为了方便,本文中使用AQS代替AbstractQueuedSynchronizer。

定义

public abstract class AbstractQueuedSynchronizer extends
    AbstractOwnableSynchronizer implements java.io.Serializable { 
    //等待队列的头节点
    private transient volatile Node head;
    //等待队列的尾节点
    private transient volatile Node tail;
    //同步状态
    private volatile int state;
    protected final int getState() { return state;}
    protected final void setState(int newState) { state = newState;}
    ...
}

队列同步器AQS是用来构建锁或其他同步组件的基础框架,内部使用一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,其中内部状态state,等待队列的头节点head和尾节点head,都是通过volatile修饰,保证了多线程之间的可见。

在深入实现原理之前,我们先看看内部的FIFO队列是如何实现的。

static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
        ...
    }

先来一张形象的图(该图其实是网上找的)


FIFO.png

黄色节点是默认head节点,其实是一个空节点,我觉得可以理解成代表当前持有锁的线程,每当有线程竞争失败,都是插入到队列的尾节点,tail节点始终指向队列中的最后一个元素。

每个节点中, 除了存储了当前线程,前后节点的引用以外,还有一个waitStatus变量,用于描述节点当前的状态。多线程并发执行时,队列中会有多个节点存在,这个waitStatus其实代表对应线程的状态:有的线程可能获取锁因为某些原因放弃竞争;有的线程在等待满足条件,满足之后才能执行等等。一共有4中状态:

  1. CANCELLED 取消状态
  2. SIGNAL 等待触发状态
  3. CONDITION 等待条件状态
  4. PROPAGATE 状态需要向后传播

等待队列是FIFO先进先出,只有前一个节点的状态为SIGNAL时,当前节点的线程才能被挂起。

实现原理

子类重写tryAcquire和tryRelease方法通过CAS指令修改状态变量state。

public final void acquire(int arg) {   
 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))    
    selfInterrupt();
}
线程获取锁过程

下列步骤中线程A和B进行竞争。

  1. 线程A执行CAS执行成功,state值被修改并返回true,线程A继续执行。
  2. 线程A执行CAS指令失败,说明线程B也在执行CAS指令且成功,这种情况下线程A会执行步骤3。
  3. 生成新Node节点node,并通过CAS指令插入到等待队列的队尾(同一时刻可能会有多个Node节点插入到等待队列中),如果tail节点为空,则将head节点指向一个空节点(代表线程B),具体实现如下:
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
  1. node插入到队尾后,该线程不会立马挂起,会进行自旋操作。因为在node的插入过程,线程B(即之前没有阻塞的线程)可能已经执行完成,所以要判断该node的前一个节点pred是否为head节点(代表线程B),如果pred == head,表明当前节点是队列中第一个“有效的”节点,因此再次尝试tryAcquire获取锁,
    1、如果成功获取到锁,表明线程B已经执行完成,线程A不需要挂起。
    2、如果获取失败,表示线程B还未完成,至少还未修改state值。进行步骤5。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  1. 前面我们已经说过只有前一个节点pred的线程状态为SIGNAL时,当前节点的线程才能被挂起。
    1、如果pred的waitStatus == 0,则通过CAS指令修改waitStatus为Node.SIGNAL。
    2、如果pred的waitStatus > 0,表明pred的线程状态CANCELLED,需从队列中删除。
    3、如果pred的waitStatus为Node.SIGNAL,则通过LockSupport.park()方法把线程A挂起,并等待被唤醒,被唤醒后进入步骤6。
    具体实现如下:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
  1. 线程每次被唤醒时,都要进行中断检测,如果发现当前线程被中断,那么抛出InterruptedException并退出循环。从无限循环的代码可以看出,并不是被唤醒的线程一定能获得锁,必须调用tryAccquire重新竞争,因为锁是非公平的,有可能被新加入的线程获得,从而导致刚被唤醒的线程再次被阻塞,这个细节充分体现了“非公平”的精髓。

线程释放锁过程:
  1. 如果头结点head的waitStatus值为-1,则用CAS指令重置为0;
  2. 找到waitStatus值小于0的节点s,通过LockSupport.unpark(s.thread)唤醒线程。
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

总结

Doug Lea大神的思路跳跃的太快,把CAS指令玩的出神入化,以至于有些逻辑反反复复debug很多次才明白。

END。
我是占小狼。
在魔都艰苦奋斗,白天是上班族,晚上是知识服务工作者。
读完我的文章有收获,记得关注和点赞哦,如果非要打赏,我也是不会拒绝的啦!

相关文章

网友评论

  • SHAN某人:非常好
  • 一滴水的坚持:我只点赞,不说话
    美团Java:@一滴水的坚持 很好
  • df352b76bbaf:你好,打扰了。
    你的第六步骤:
    线程每次被唤醒时,都要进行中断检测,如果发现当前线程被中断,那么抛出InterruptedException并退出循环。从无限循环的代码可以看出,并不是被唤醒的线程一定能获得锁,必须调用tryAccquire重新竞争,因为锁是非公平的,有可能被新加入的线程获得,从而导致刚被唤醒的线程再次被阻塞,这个细节充分体现了“非公平”的精髓。

    我与你的看法不太一致,你所说的是体现了非公平性,在我看来正是体现公平性。
    容我讲述一下我得观点:

    首先再final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

    代码中, 如果当前节点的前驱不是头结点,则会进入第二个if判断体
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    parkAndCheckInterrupt中
    private final boolean parkAndCheckInterrupt() {
    // 通过LockSupport的park()阻塞“当前线程”。
    LockSupport.park(this);
    // 返回线程的中断状态。
    return Thread.interrupted();
    }

    首先要知道 LockSupport.park() 会响应中断,但不会抛出 InterruptedException,并且Thread.interrupted()返回线程的中断状态时会清空中断状态,当前线程会因为自旋再次进入阻塞状态。

    所以当执行park线程进入阻塞状态,当unpark唤醒之后,返回中断状态之后,并且清除线程的中断状态。

    但是如果前驱节点不是头结点,并不会进入第一个循环体,所以并不会返回中断状态。 而是重新自旋进入阻塞状态。
    体现了FIFO的公平性。
    hongrm:确实是非公平的,公平性是由子类去实现的,以下这段代码取自ReentrantLock公平同步队列器的实现
    static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
    acquire(1);
    }

    /**
    * Fair version of tryAcquire. Don't grant access unless
    * recursive call or no waiters or is first.
    */
    protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
    //hasQueuedPredecessors这个方法首先检验的就是head != tail 如果这为真,这下面的代码就不能执行,则会执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    if (!hasQueuedPredecessors() &&
    compareAndSetState(0, acquires)) {
    setExclusiveOwnerThread(current);
    return true;
    }
    }
    else if (current == getExclusiveOwnerThread()) {
    int nextc = c + acquires;
    if (nextc < 0)
    throw new Error("Maximum lock count exceeded");
    setState(nextc);
    return true;
    }
    return false;
    }
    }
  • 一枝花算不算浪漫:写的很好,看了好几遍,感谢博主的分享。:+1:
  • 喜欢敲代码的猫:想看一些数据库优化的详细能看的懂的,小狼哥能写一篇么
  • 3a79c57eee54:您好有个问题问下:
    final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
    boolean interrupted = false;
    for (;;) {
    final Node p = node.predecessor();
    if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
    }
    if (shouldParkAfterFailedAcquire(p, node) &&
    parkAndCheckInterrupt())
    interrupted = true;
    }
    } finally {
    if (failed)
    cancelAcquire(node);
    }
    }

    就是这一段代码 if (p == head && tryAcquire(arg)) 如果线程被挂起,然后唤醒后,对于已经在队列中的线程是不是只能等待前一个线程执行完才能执行?那么对于已经在队列中的线程就一定不是公平竞争了(因为必须等待自己的前节点为head),或者说队列中的线程就一定是顺序执行了,是这样的吗?
    美团Java:@猪迪傻迪 如果已经在队列中了,就必须按照顺序了,不公平是针对那种还没进队列的线程可以和队列中的第一个节点抢占资源
  • 4559fe085a2d:请问你怎么debug的?
    美团Java:@benyVip1 有什么不一样么?
    4559fe085a2d:@占小狼 好奇是自己写单元测试debug进去?
    美团Java:@benyVip1 在ide中打断点
  • hongrm:predecessor()方法上的注释这样写着:
    Returns previous node, or throws NullPointerException if null.
    Use when predecessor cannot be null. The null check could
    be elided, but is present to help the VM.
    请问您知道为什么这里的空检查可以帮助到VM吗?
  • 7a52f0b40d3d:shouldParkAfterFailedAcquire(),如果pred的waitStatus == 0,则通过CAS指令修改waitStatus为Node.SIGNAL。为什么要修改为Node.SIGNAL,可以解释下吗
    70342eb43d60:@浮生一梦_fe1f 我的理解是前一节点加入队列但是状态位仅仅表示它只是在队列中,把它标示为SIGNAL后就表示它的后面一个节点需要运行,此时返回false,回到shouldparkafteracquirefail方法,继续循环因为原节点的前一节点已经置为signal 所以它返回true 执行park挂起。每个节点的signal都是后面节点设置的。原线程挂在原地,等待unparksuccessor。这是我的看法,欢迎指正
  • 409e1e2bdfd8:老铁,文章能看懂,但是好像没有细讲共享锁怎么处理。AQS最难得地方是共享锁的处理,我认为老铁应该重点讲一下这个。我这一块不是很懂!
    一剑霜寒aa:@jianandgui 不是这样的,共享锁表面的意思大家都懂,但jdk里面代码是实现的逻辑很复杂,他考虑了并发下很多情况,我们看不懂,就是因为某些场景没有想到
    339133f33fc6:共享锁和独占锁最大的区别就在于AQS中state字段所代表的含义,这里理解了其实二者都差不多的
    70342eb43d60:@无望的色彩 共享锁我觉得最大的区别就是唤醒线程时,如果资源足够就可以连续唤醒后面的线程,这样就实现了并发,但是队列唤醒顺序也是先进先出,只是要考虑新加入的线程,这又涉及到公平与非公平获取了
  • 4c78ab23aaa3:这句应该是调用acquireInterruptibly才会产生InterruptedException。
    “线程每次被唤醒时,都要进行中断检测,如果发现当前线程被中断,那么抛出InterruptedException并退出循环。”
    但前面贴的都是acqure的代码。。。看了半天才发现
  • trgaaaaa:还有,1步骤改变了状态是改成了什么状态,原来是什么状态呢
    a3b65c108416: @trgaaaaa AQS类里有个字段state,当state等于0的时候,表示没有线程占有锁,那就用cas把state变成1,这样下个线程进来的时候就知道,锁已经被占有了,就会执行排队的逻辑
  • trgaaaaa:3步有点不明白,感觉tail永远都加不进来,因为开始tail就是空,每次调用这个方法,tail都是空,那怎样才使得tail不为空呢
    a3b65c108416: @trgaaaaa 当tail=null的时候,说明队列还未初始化,于是新建node,用cas将该node设置为head,并把tail也指向该node。然后tail就不是空的啦。
  • light先生:你是怎么debug JDK源码的?用的openJDK吗?直接用JDK可以进行debug吗?
    美团Java:@LeeVIP 看源码需要的是耐心,多看几篇,自己动手调试一下
    流氓兔之夫:这篇是看不懂:smiley:
    美团Java:@light先生 直接jdk的源码就可以
  • 风洛洛:您好,对于你本文中的
    线程获取锁过程这部分的2的线程A执行CAS指令失败,说明线程B也在执行CAS指令且成功。
    其实我觉的可以把说明线程B也在执行CAS指令且成功这句话去掉,这种情况其实是基于了解RreentrantLock中的非公平锁的Lock情况下出现的场景,有些特殊,之前不了解这个非公平锁的时候,读这句话总是无法想到是什么场景,后来看了那里才知道这的场景。
    仅仅提下自己的一些见解,你的文章写得都很好,哈哈!在一点点读你的文章学习。:grin:
    风洛洛:@占小狼 :grin: , 好文章得认真读
    美团Java:@风洛洛 好的,读的好认真
  • 禾禾斗斗:等待队列是FIFO先进先出,只有前一个节点的状态为SIGNAL时,当前节点的线程才能被挂起。
    ------------
    这一句怎么理解?
    70342eb43d60:@awkejiang 前一节点被置为signal说明后面有线程需要执行,但是轮不到它后面的线程执行,但是后面线程一定要找一个前驱节点不为cancel的节点,然后把它设置为siganl然后原地挂起,等待唤醒。 因为siganl执行完了会唤醒紧接着的后面一个
    Terminalist:也就是前一个节点获取了锁,等这个获取锁的节点执行完后,就轮到这个状态为signal的节点获取锁了!
    美团Java:@awkejiang 要结合着代码看
  • jianshuhua:您好,有个问题:这篇文章里您写到:
    线程获取锁过程:第6点:线程每次被唤醒时,都要进行中断检测,如果发现当前线程被中断,那么抛出InterruptedException并退出循环。
    我觉得不对,parkAndCheckInterrupt()里通过LockSupport.park(this)挂起线程,如果被中断,是不会抛出InterruptedException的,所以也不会退出这个循环。
    所以我也不是很明白为什么会在acquire(int arg)函数里调用selfInterrupt()方法。
    拙拙者: @jianshuhua 我的理解是保证线程的整个状态的一致性
    jianshuhua:@yl_a52b 假设线程是因为被中断返回,那么恢复线程的Interrupt状态又有什么作用?
    拙拙者:这里的selfInterrupt方法是为了恢复线程的Interrupt状态
    因为线程被park方法阻塞后,可能因为以下原因返回:

    Some other thread invokes unpark with the current thread as the target; or
    Some other thread interrupts the current thread; or
    The call spuriously (that is, for no reason) returns.
  • 588fd0c11960:学到很多东西,谢谢
    美团Java:@渣渣高 :smiley:
  • icyage:这部分的分析很棒
    美团Java:@icyage 一起学习,一起进步

本文标题:深入浅出java同步器AQS

本文链接:https://www.haomeiwen.com/subject/erfrjttx.html