美文网首页
AbstractQueuedSynchronizer源码解析

AbstractQueuedSynchronizer源码解析

作者: idolice24 | 来源:发表于2018-10-11 13:12 被阅读0次

    大家都亲切地称呼这玩意为AQS,作者写了注释哒:

    * Provides a framework for implementing blocking locks and related

    * synchronizers (semaphores, events, etc) that rely on

    * first-in-first-out (FIFO) wait queues.  This class is designed to

    * be a useful basis for most kinds of synchronizers that rely on a

    * single atomic {@code int} value to represent state. Subclasses

    * must define the protected methods that change this state, and which

    * define what that state means in terms of this object being acquired

    * or released.  Given these, the other methods in this class carry

    * out all queuing and blocking mechanics. Subclasses can maintain

    * other state fields, but only the atomically updated {@code int}

    * value manipulated using methods {@link #getState}, {@link

    * #setState} and {@link #compareAndSetState} is tracked with respect

    * to synchronization.

    我来翻译一下就是,提供了一个用来实现阻塞锁和相应的synchronizer(信号量,事件等等)的框架,这个框架基于先进先出的等待队列,这个类是被设计来作为大多数synchronizer的基础,它依靠一个具有原子性的int值去代表状态(这个状态我理解的就是当前被控制的对象的状态,比如你一个锁的状态),它的子类必须定义一个protected的方法来改变这个状态,这个状态它定义了几个,比如什么值代表它是正在被占有,什么值代表它已经被释放等等,子类也可以自己定义一些自己的状态。但是关于这个状态的改变还是很重要的,因为它的值也是应该受到保护的,不能让多个线程同时操作以免意外情况的发生。

    看看它有哪些字段:

    private transient volatile Node head;

    private transient volatile Node tail;

    private volatile int state;

    private static final Unsafe unsafe = Unsafe.getUnsafe();

    private static final long stateOffset;

    private static final long headOffset;

    private static final long tailOffset;

    private static final long waitStatusOffset;

    private static final long nextOffset;

    啊,就这些,其中Node这个数据结构也是定义在AQS里面的,这个Node就是用来存储等待的线程们的:

    static final classNode {

       static finalNodeSHARED=newNode();

       static finalNodeEXCLUSIVE=null;

       static final intCANCELLED=1;

       static final intSIGNAL   = -1;

       static final intCONDITION= -2;

       static final intPROPAGATE= -3;

       volatile intwaitStatus;

       volatileNodeprev;

       volatileNodenext;

       volatileThreadthread;

       NodenextWaiter;

       final booleanisShared() {

    returnnextWaiter==SHARED;

       }

       finalNodepredecessor()throwsNullPointerException {

    Node p =prev;

            if(p ==null)

    throw newNullPointerException();

            else

                returnp;

       }

    Node() {// Used to establish initial head or SHARED marker

       }

    Node(Thread thread,Node mode) {// Used by addWaiter

           this.nextWaiter= mode;

            this.thread= thread;

       }

    Node(Thread thread, intwaitStatus) {// Used by Condition

           this.waitStatus= waitStatus;

            this.thread= thread;

       }

    }

    很明显的一个双向链表结构,这里我们可以看到有两个特别定义的Node,一个是SHARED,一个是EXCLUSIVE,他们代表了线程对于资源控制的两种不同的模式,SHARED代表共享模式,EXCLUSIVE代表独占模式。

    首先要说一下为什么需要分这些模式,这要涉及到程序对于某些对象的操作其实分为读和写,读的话,多少个线程来读都OK,对吧,大家读的都是一样的东西,没必要只让一个线程读,其他线程还得等,性能多差啊,这就是共享模式的作用,让大家在执行读操作的时候一起~

    独占模式就更容易理解了,你的操作涉及到对对象的修改(并且这部分操作是原子操作,也就是它的执行逻辑上是得要么一起做完要么没开始做的),那么多线程是不是会破坏它的原子性?这也是我们使用锁的原因,而且这个时候只能让一个线程先做完这部分操作,再让其他线程接着做,这就是独占模式啦。

    第一部分 独占模式

    好,那么我们来分析一波独占模式是怎么独占到资源的,入口函数:

    public final voidacquire(intarg) {

    if(!tryAcquire(arg) &&

    acquireQueued(addWaiter(Node.EXCLUSIVE),arg))

    selfInterrupt();

    }

    这个tryAcquire():

    protected booleantryAcquire(intarg) {

    throw newUnsupportedOperationException();

    }

    可以看到是个只会抛异常的空的函数,里面什么也没有,因为我们AQS只是个基础类嘛,不会把所有东西都实现,这个定制化的任务交给实现它的子类,简而言之,它的功能是去尝试获取资源啦,然后是acquireQueued()方法:

    final booleanacquireQueued(finalNode node, intarg) {

    booleanfailed =true;

        try{

    booleaninterrupted =false;

            for(;;) {

    finalNode p = node.predecessor();

                if(p ==head&& tryAcquire(arg)) {

    setHead(node);

                   p.next=null;// help GC

                   failed =false;

                    returninterrupted;

               }

    if(shouldParkAfterFailedAcquire(p,node) &&

                    parkAndCheckInterrupt())

    interrupted =true;

           }

    }finally{

    if(failed)

    cancelAcquire(node);

       }

    }

    这个方法呢,做的就是当一个线程节点获取资源成功(它必须是在线程等待队列的第二个节点,因为有判断node.predecessor()==head),至于为什么,可能是为了公平吧,毕竟先进队列的等的更久, 成功后,就把当前线程节点node设置为头节点然后将之前的p也就是头结点的next设为null, 目的是让它不被任何其他对象引用,下次GC时就会被回收掉,并且返回interrupted,如果否,这样在acquire(intarg)方法中就不会去调用selfInterrupt()方法了。如果获取资源失败了,就会首先判断执行shouldParkAfterFailedAcquire(p,node)这个方法(用来确定是否是要中断线程):

    private static booleanshouldParkAfterFailedAcquire(Node pred,Node node) {

    intws = pred.waitStatus;

        if(ws == Node.SIGNAL)

           return true;

        if(ws >0) {

    /*

             * Predecessor was cancelled. Skip over predecessors and

             * indicate retry.

             */

           do{

    node.prev= pred = pred.prev;

           }while(pred.waitStatus>0);

           pred.next= node;

       }else{

    /*

             * waitStatus must be 0 or PROPAGATE.  Indicate that we

             * need a signal, but don't park yet.  Caller will need to

             * retry to make sure it cannot acquire before parking.

             */

           compareAndSetWaitStatus(pred,ws,Node.SIGNAL);

       }

    return false;

    }

    这个方法是去判断他的前驱节点的状态,如果是Signal的话,代表了它前面那个节点release的时候是会正常通知下个节点的所以它就能放心的阻塞自己。

    如果说前驱节点状态是Cancel那么就得忽略掉前面的节点,把前驱节点的前驱节点作为当前节点的前驱节点(好拗口,反正就是如果前驱节点是Cancel的,就跳过它再往前,这个连等是自右向左赋值的,大家自己看代码)

    然后如果前驱节点的状态不是以上两种(那有可能是CONDITION或PROPAGATE),就讲这个前驱节点的状态设置为Node.SIGNAL。

    如果这个方法返回的是true那么则会执行parkAndCheckInterrupt()方法:

    private final booleanparkAndCheckInterrupt() {

    LockSupport.park(this);

        returnThread.interrupted();

    }

    这个方法很简单,就是将线程先停止一下,具体呢?大家可以看我写的另一篇关于LockSupport类的文章哈(https://www.jianshu.com/p/6bffd19cb900不是我打广告,反正你们也不给钱,而是你不看你都不知道LockSupport.park(this)这个方法干了些什么,如果我在这来讲一遍,那就又跑题啦)~

    然后就返回这个线程是否被暂停,那么返回acquireQueued()方法,如果parkAndCheckInterrupt()返回的是true,那么在acquireQueued()方法中就会把interrupted这个变量赋值为true,在acquireQueued()中是一个循环,就是循环去tryAquire()获取资源,方法返回值就是这个interrupted, 如果是true代表线程已经被暂停了,如果是false代表没有被暂停。

    然后注意到还有

    finally{

    if(failed)

    cancelAcquire(node);

       }

    这部分代码,表示最终如果都没能获取这个资源的话,就执行cancelAcquire()方法:

    private voidcancelAcquire(Node node) {

    // 如果节点是null的话就忽略

       if(node ==null)

    return;

       node.thread=null;

       // 跳过已经是cancel状态的前置节点

       Node pred = node.prev;

        while(pred.waitStatus>0)

    node.prev= pred = pred.prev;

       Node predNext = pred.next;

       node.waitStatus= Node.CANCELLED;

       if(node ==tail&& compareAndSetTail(node,pred)) {

    compareAndSetNext(pred,predNext, null);

       }else{

           intws;

            if(pred !=head&&

    ((ws = pred.waitStatus) == Node.SIGNAL||

    (ws <=0&&compareAndSetWaitStatus(pred,ws,Node.SIGNAL))) &&

    pred.thread!=null) {

    Node next = node.next;

                if(next !=null&& next.waitStatus<=0)

    compareAndSetNext(pred,predNext,next);

           }else{

    unparkSuccessor(node);

           }

    node.next= node;// 帮助GC

       }

    }

    哎哟那么多行代码看得脑壳疼,看看它都做了什么,一句话概括,它完了,它会把它自己从这个Node链中移除,让自己等着被GC回收,大家可以看到这里面调用了两次compareAndSetNext()方法:

    private static final booleancompareAndSetNext(Node node,

                                                  Node expect,

                                                  Node update) {

    returnunsafe.compareAndSwapObject(node,nextOffset,expect,update);

    }

    其实就是调用Unsafe的方法通过CAS的方式来设置Node的next节点,就是把当前节点的前面节点指向当前节点的后面节点:

    这里注意还有一个方法unparkSuccessor(),这个方法激活它后面的节点:

    private voidunparkSuccessor(Node node) {

       intws = node.waitStatus;

        if(ws <0)

    compareAndSetWaitStatus(node,ws,0);

       Node s = node.next;

        if(s ==null|| s.waitStatus>0) {

    s =null;

            for(Node t =tail;t !=null&& t != node;t = t.prev)

    if(t.waitStatus<=0)

    s = t;

       }

    if(s !=null)

    LockSupport.unpark(s.thread);

    }

    可以看到,如果当前节点的状态不是CANCELLED,那就设置当前的状态为0,然后下一步,操作它的下一个节点,前提是这个节点不为null并且它的状态不为CANCELLED, 如果为null或者CANCELLED,又要进行下一波操作,将Node由尾向前遍历,找到最靠前的那个不为null并且不是当前节点本身并且状态不为CANCELLED的节点,这个由后向前遍历的操作是有点骚的啊,大家体会一下,因为它的next都为null啦,所以没法从next开始遍历,于是往前遍历了。

    言归正传,找到下一个满足条件的后节点后,就会通过LockSupport.unpark(s.thread)方法将其唤醒~也就是轮到下个节点的线程进行工作的时候啦。

    走到了这里,我们就可以回到acquire()方法啦, 我们是不是沿着acquire()的脚步走了一圈?咦,漏了addWaiter(Node.EXCLUSIVE)这个方法:

    privateNodeaddWaiter(Node mode) {

    Node node =newNode(Thread.currentThread(),mode);

       Node pred =tail;

        if(pred !=null) {

    node.prev= pred;

            if(compareAndSetTail(pred,node)) {

    pred.next= node;

                returnnode;

           }

        }

    enq(node);

        returnnode;

    }

    这个方法返回的值是一个Node节点,这个返回值是作为acquireQueued()参数传入的,它接受的参数也是个Node,我们来看看这个方法的输入和输出有啥区别~

    简而言之当这个AQS实例的tail这个节点不为空时,它会将参数这个节点加入到AQS实例的Node链表中(加到末尾),你可以看到通过compareAndSetTail()方法(我不再细讲这个方法了哈,其实就是调用的Unsafe的方法,本质是CAS来保证原子性)来重新设置了AQS实例的尾节点,并返回这个节点。

    如果这个AQS实例的tail是空的话,就会执行一个方法enq(node):

    privateNodeenq(finalNode node) {

    for(;;) {

    Node t =tail;

            if(t ==null) {// Must initialize

               if(compareAndSetHead(newNode()))

    tail=head;

           }else{

    node.prev= t;

                if(compareAndSetTail(t,node)) {

    t.next= node;

                    returnt;

               }

            }

        }

    }

    可以看到这个方法里有一个循环,在这里面又会去取到AQS实例的tail节点,再去判断它是否为空,如果是空的,新建一个Node节点赋给头结点,再把头节点赋给尾节点,其实就是初始化head和tail,然后tail就不为null了嘛,就会走进else这个分支,所以这里的for(;;)操作也很骚气~它会一直等到tail不是null的时候return。else做的操作就很浅显易懂了,就是讲要添加的node节点赋给tail作为整个链表的尾节点然后返回的就是当前节点的前一个节点(但这里调用enq()时并没有接这个返回值,只是调用而已)。

    然后是acquire()方法中的最后一个方法selfInterrupt():

    static voidselfInterrupt() {

    Thread.currentThread().interrupt();

    }

    其实就是设置中断的状态为true,因为acquire里有个判断acquireQueued(addWaiter(Node.EXCLUSIVE), arg),这里返回的是线程是否被中断了,如果被中断了才执行selfInterrupt,目的是将这个中断状态传递出去,关于线程的中断机制,我理解了老半天,感觉网上搜到文章都不太地道,所以我决定写一篇文章专门讲这个,敬请期待~

    那么独占模式下AQS获取资源的整个流程就走完了,是不是还晕晕的,是的!正常!那我们来总结一下流程(从http://www.javarticles.com/2012/10/abstractqueuedsynchronizer-aqs.html#prettyPhoto盗的一张图~懒得自己画了,谅解一下~):

    一句话总结就是:线程会去尝试独占资源,如果成功就是将threadowner设为它自己然后接着做想做的事情,如果不成功就将这个线程放入一个先入先出的队列等待,这是站在线程的角度看,它被放进了一个队列,站在队列的角度看,它还会让队列里满足条件的线程去持续尝试独占资源,如果占成功了那就行了呗,没有的话就看要不要把线程给暂停了,如果暂停了就等待被唤醒啦,如果没暂停的就再尝试占有资源,一直就这样~(这一句话貌似有点长=。=)

    好~获取资源部分终于差不多了,获取了之后总要释放呗,下面就讲独占资源的释放~

    资源的释放呢是调用的release方法:

    public final booleanrelease(intarg) {

    if(tryRelease(arg)) {

    Node h =head;

            if(h !=null&& h.waitStatus!=0)

    unparkSuccessor(h);

            return true;

       }

    return false;

    }

    其中tryRelease(arg)这个就跟tryAcquire()一样是由继承它的子类实现的(如果想看例子请看ReentrantLock中的tryRelease方法),想想,这个方法也是当前线程来执行的对吧,那就意味着,当前线程此刻是运行的状态,这个就会通过unparkSuccessor(h)方法唤醒头节点的下一个节点,这个unparkSuccessor(h)方法前面已经分析过了,反正就是唤醒下个节点。

    然后释放的工作就结束啦~

    说完独占模式后该说共享模式啦~期待得搓手手~

    第二部分 共享模式

    还是按照套路,将获取资源的方法走一遍~

    共享模式获取资源调用的是acquireShared方法:

    public final voidacquireShared(intarg) {

    if(tryAcquireShared(arg) <0)

    doAcquireShared(arg);

    }

    tryAcquireShared大家应该能猜到是由子类实现了,也就是我们自己写实现,反正只要它小于0,我们实现的时候就要注意了,小于0在这里就要代表没有获取成功,这样就会执行下面的操作 doAcquireShared(arg):

    private voiddoAcquireShared(intarg) {

    finalNode node = addWaiter(Node.SHARED);

        booleanfailed =true;

        try{

    booleaninterrupted =false;

            for(;;) {

    finalNode p = node.predecessor();

                if(p ==head) {

    intr = tryAcquireShared(arg);

                    if(r >=0) {

    setHeadAndPropagate(node,r);

                       p.next=null;// help GC

                       if(interrupted)

    selfInterrupt();

                       failed =false;

                        return;

                   }

                }

    if(shouldParkAfterFailedAcquire(p,node) &&

                    parkAndCheckInterrupt())

    interrupted =true;

           }

    }finally{

    if(failed)

    cancelAcquire(node);

       }

    }

    首先同样会执行addWaiter(Node.SHARED),这个方法之前讲了嘛,独占模式也有,就是讲当前线程所在节点加入到等待队列里面,唯一不同的是这里设置的nextWaiter是Node.SHARED而已。然后进入循环,如果当前节点的前置节点是头节点的话,会去接着尝试tryAcquireShared(arg)试图获取资源,如果成功,则执行setHeadAndPropagate方法:

    private voidsetHeadAndPropagate(Node node, intpropagate) {

    Node h =head;// 保存旧的head用于下面检查

       setHead(node);

       if(propagate >0|| h ==null|| h.waitStatus<0||

    (h =head) ==null|| h.waitStatus<0) {

    Node s = node.next;

            if(s ==null|| s.isShared())

    doReleaseShared();

       }

    }

    它这个方法里有一串判断条件:propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0

    翻译:以下条件满足其一:

              1. propagate也就是tryAcquireShared方法的返回值要大于0,也就是要获取资源成功

          2. 之前的头节点是空

          3. 之前的头节点的等待状态不是CANCELLED

          4. 当前头节点为空

          5. 当前头节点等待状态不是CANCELLED

    好,满足这几个要求中的一个就能走到if里面去啦,看看它做了什么呢?

    首先找到了当前节点的下一个节点,如果它是空的或者它的模式是共享模式,那么执行doReleaseShared():

    private voiddoReleaseShared() {

       for(;;) {

    Node h =head;

            if(h !=null&& h !=tail) {

    intws = h.waitStatus;

                if(ws == Node.SIGNAL) {

    if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))

    continue;            

                   unparkSuccessor(h);

               }

    else if(ws ==0&&

    !compareAndSetWaitStatus(h,0,Node.PROPAGATE))

    continue;               // CAS失败后继续循环

           }

    if(h ==head)// 如果头节点不一样了则循环继续,否则跳出循环

               break;

       }

    }

    哇哦,又是一个循环,我们可以看到,跳出循环的条件是h == head, 然鹅,h = head,那么我们就有两个猜想,第一,这个head可能是在多线程的环境下发生了变化,不然在单线程的条件下,h == head是永远成立的, 第二,在上面这段:

     if(h !=null&& h !=tail) {

           int ws = h.waitStatus;

           if(ws == Node.SIGNAL) {

    if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))

                   continue;            

                   unparkSuccessor(h);

               }

    else if(ws ==0&&!compareAndSetWaitStatus(h,0,Node.PROPAGATE))

                continue;               // CAS失败后继续循环

           }

    操作中可能有对head的修改,那么我们先来看看这段代码有没有修改head。

    当h不是空并且也不是最后一个节点的情况下,我们拿到他的等待状态,如果是SIGNAL(代表了它后面的节点需要被唤醒),然后我们就去将它的等待状态设为0(初始状态,表示后面没有节点等待被唤醒),并unparkSuccessor(唤醒后面的节点),这个方面前面也讲了,记不住翻上去看,然后如果节点的等待状态是0的话就回去设置它的状态是Node.PROPAGATE,但是这个方法也没有改变过head节点。

    那么就意味着我们第一个猜想是正确的,head可能在多个线程的执行过程中发生了变化,那么head没变就代表了已经没有多个线程在这同时执行这段代码了(多个线程试图获取资源),也就可以退出循环了。

    跳出之后,我们可以回到doAcquireShared方法了,我们已经唤醒了后面的共享模式的节点,然后我们就把p(头节点从等待队列中删除)

    if(shouldParkAfterFailedAcquire(p,node) &&

                    parkAndCheckInterrupt())

        interrupted =true;

    这段代码很熟悉了,独占模式下也有,就是判断当获取资源失败时是否将线程中断,并检查线程是否被中断,赋值给interrupted。

    如果是被中断了的话,就执行selfInterrupt方法重置线程的中断标志为true,然后就可以跳出循环了。反正不管跳不跳出循环,最终是要执行下面这段代码的:

    finally{

    if(failed)

    cancelAcquire(node);

    }

    cancelAcquire()这个方法我们前面也讲了,就是将当前线程给取消掉,一定条件下去唤醒后面的节点。然后就完事~是不是很简单,过程基本上和独占的差不多,只是它会执行setHeadAndPropagate这个方法,也就是如果它的下个节点也是SHARED模式的话,它会将它的下个节点也唤醒。

    好的大概讲一下流程,白话讲哈,比较容易理解,就是线程首先会去获取资源嘛,如果获取到了就占有它,如果没获取到就加入等待队列的末尾嘛,然后队列中的线程都会去判断一下,它前面那个节点是不是头节点,因为头节点就是占有着资源的那个节点嘛,头节点完了就可以轮到它了,所以它就不挂起自己,接着请求,重复请求等着前面节点释放资源嘛,释放了资源它就紧接着占有资源啦,占有后,共享模式会把自己线程的节点设为头节点了嘛,之前的头已经完成移出队列了,然后现在就它自己是头啦,它会去检查自己后面那个节点是不是也是SHARED共享模式的,如果是,它就把它唤醒!当然也有线程挂起的时候啦,比如它前面的节点并不是头节点的时候,那么这个时候要把它前面那个节点的状态设为Node.SIGNAL,这样子当他前面节点被唤醒,执行完成要将资源交出去的时候才会去唤醒它后面的节点,不然的话后面的节点就不能够被唤醒哦,所以Node.SIGNAL很重要啦~

    整个过程大概就是这样子的,共享模式的释放也很简单跟独占模式类似所以也不讲啦~

    呼~好累,但是还是要多说两句,这个AbstractQueuedSynchronizer所有方法我们都要考虑到是很多个线程同时在执行的,虽然有的阻塞,有的在执行,时刻这样想着有助于我们理解这个代码的一些设计~

    最后就是,我也是自己试图去把自己的理解写出来的,有什么问题和建议都请留言,批评就算了,写这个很累的,况且仙女不接受批评~

    相关文章

      网友评论

          本文标题:AbstractQueuedSynchronizer源码解析

          本文链接:https://www.haomeiwen.com/subject/iikbaftx.html