美文网首页
AbstractQueuedSynchronizer源码解析

AbstractQueuedSynchronizer源码解析

作者: idolice24 | 来源:发表于2018-10-11 13:12 被阅读0次

大家都亲切地称呼这玩意为AQS,作者写了注释哒:

* Provides a framework for implementing blocking locks and related

* synchronizers (semaphores, events, etc) that rely on

* first-in-first-out (FIFO) wait queues.  This class is designed to

* be a useful basis for most kinds of synchronizers that rely on a

* single atomic {@code int} value to represent state. Subclasses

* must define the protected methods that change this state, and which

* define what that state means in terms of this object being acquired

* or released.  Given these, the other methods in this class carry

* out all queuing and blocking mechanics. Subclasses can maintain

* other state fields, but only the atomically updated {@code int}

* value manipulated using methods {@link #getState}, {@link

* #setState} and {@link #compareAndSetState} is tracked with respect

* to synchronization.

我来翻译一下就是,提供了一个用来实现阻塞锁和相应的synchronizer(信号量,事件等等)的框架,这个框架基于先进先出的等待队列,这个类是被设计来作为大多数synchronizer的基础,它依靠一个具有原子性的int值去代表状态(这个状态我理解的就是当前被控制的对象的状态,比如你一个锁的状态),它的子类必须定义一个protected的方法来改变这个状态,这个状态它定义了几个,比如什么值代表它是正在被占有,什么值代表它已经被释放等等,子类也可以自己定义一些自己的状态。但是关于这个状态的改变还是很重要的,因为它的值也是应该受到保护的,不能让多个线程同时操作以免意外情况的发生。

看看它有哪些字段:

private transient volatile Node head;

private transient volatile Node tail;

private volatile int state;

private static final Unsafe unsafe = Unsafe.getUnsafe();

private static final long stateOffset;

private static final long headOffset;

private static final long tailOffset;

private static final long waitStatusOffset;

private static final long nextOffset;

啊,就这些,其中Node这个数据结构也是定义在AQS里面的,这个Node就是用来存储等待的线程们的:

static final classNode {

   static finalNodeSHARED=newNode();

   static finalNodeEXCLUSIVE=null;

   static final intCANCELLED=1;

   static final intSIGNAL   = -1;

   static final intCONDITION= -2;

   static final intPROPAGATE= -3;

   volatile intwaitStatus;

   volatileNodeprev;

   volatileNodenext;

   volatileThreadthread;

   NodenextWaiter;

   final booleanisShared() {

returnnextWaiter==SHARED;

   }

   finalNodepredecessor()throwsNullPointerException {

Node p =prev;

        if(p ==null)

throw newNullPointerException();

        else

            returnp;

   }

Node() {// Used to establish initial head or SHARED marker

   }

Node(Thread thread,Node mode) {// Used by addWaiter

       this.nextWaiter= mode;

        this.thread= thread;

   }

Node(Thread thread, intwaitStatus) {// Used by Condition

       this.waitStatus= waitStatus;

        this.thread= thread;

   }

}

很明显的一个双向链表结构,这里我们可以看到有两个特别定义的Node,一个是SHARED,一个是EXCLUSIVE,他们代表了线程对于资源控制的两种不同的模式,SHARED代表共享模式,EXCLUSIVE代表独占模式。

首先要说一下为什么需要分这些模式,这要涉及到程序对于某些对象的操作其实分为读和写,读的话,多少个线程来读都OK,对吧,大家读的都是一样的东西,没必要只让一个线程读,其他线程还得等,性能多差啊,这就是共享模式的作用,让大家在执行读操作的时候一起~

独占模式就更容易理解了,你的操作涉及到对对象的修改(并且这部分操作是原子操作,也就是它的执行逻辑上是得要么一起做完要么没开始做的),那么多线程是不是会破坏它的原子性?这也是我们使用锁的原因,而且这个时候只能让一个线程先做完这部分操作,再让其他线程接着做,这就是独占模式啦。

第一部分 独占模式

好,那么我们来分析一波独占模式是怎么独占到资源的,入口函数:

public final voidacquire(intarg) {

if(!tryAcquire(arg) &&

acquireQueued(addWaiter(Node.EXCLUSIVE),arg))

selfInterrupt();

}

这个tryAcquire():

protected booleantryAcquire(intarg) {

throw newUnsupportedOperationException();

}

可以看到是个只会抛异常的空的函数,里面什么也没有,因为我们AQS只是个基础类嘛,不会把所有东西都实现,这个定制化的任务交给实现它的子类,简而言之,它的功能是去尝试获取资源啦,然后是acquireQueued()方法:

final booleanacquireQueued(finalNode node, intarg) {

booleanfailed =true;

    try{

booleaninterrupted =false;

        for(;;) {

finalNode p = node.predecessor();

            if(p ==head&& tryAcquire(arg)) {

setHead(node);

               p.next=null;// help GC

               failed =false;

                returninterrupted;

           }

if(shouldParkAfterFailedAcquire(p,node) &&

                parkAndCheckInterrupt())

interrupted =true;

       }

}finally{

if(failed)

cancelAcquire(node);

   }

}

这个方法呢,做的就是当一个线程节点获取资源成功(它必须是在线程等待队列的第二个节点,因为有判断node.predecessor()==head),至于为什么,可能是为了公平吧,毕竟先进队列的等的更久, 成功后,就把当前线程节点node设置为头节点然后将之前的p也就是头结点的next设为null, 目的是让它不被任何其他对象引用,下次GC时就会被回收掉,并且返回interrupted,如果否,这样在acquire(intarg)方法中就不会去调用selfInterrupt()方法了。如果获取资源失败了,就会首先判断执行shouldParkAfterFailedAcquire(p,node)这个方法(用来确定是否是要中断线程):

private static booleanshouldParkAfterFailedAcquire(Node pred,Node node) {

intws = pred.waitStatus;

    if(ws == Node.SIGNAL)

       return true;

    if(ws >0) {

/*

         * Predecessor was cancelled. Skip over predecessors and

         * indicate retry.

         */

       do{

node.prev= pred = pred.prev;

       }while(pred.waitStatus>0);

       pred.next= node;

   }else{

/*

         * waitStatus must be 0 or PROPAGATE.  Indicate that we

         * need a signal, but don't park yet.  Caller will need to

         * retry to make sure it cannot acquire before parking.

         */

       compareAndSetWaitStatus(pred,ws,Node.SIGNAL);

   }

return false;

}

这个方法是去判断他的前驱节点的状态,如果是Signal的话,代表了它前面那个节点release的时候是会正常通知下个节点的所以它就能放心的阻塞自己。

如果说前驱节点状态是Cancel那么就得忽略掉前面的节点,把前驱节点的前驱节点作为当前节点的前驱节点(好拗口,反正就是如果前驱节点是Cancel的,就跳过它再往前,这个连等是自右向左赋值的,大家自己看代码)

然后如果前驱节点的状态不是以上两种(那有可能是CONDITION或PROPAGATE),就讲这个前驱节点的状态设置为Node.SIGNAL。

如果这个方法返回的是true那么则会执行parkAndCheckInterrupt()方法:

private final booleanparkAndCheckInterrupt() {

LockSupport.park(this);

    returnThread.interrupted();

}

这个方法很简单,就是将线程先停止一下,具体呢?大家可以看我写的另一篇关于LockSupport类的文章哈(https://www.jianshu.com/p/6bffd19cb900不是我打广告,反正你们也不给钱,而是你不看你都不知道LockSupport.park(this)这个方法干了些什么,如果我在这来讲一遍,那就又跑题啦)~

然后就返回这个线程是否被暂停,那么返回acquireQueued()方法,如果parkAndCheckInterrupt()返回的是true,那么在acquireQueued()方法中就会把interrupted这个变量赋值为true,在acquireQueued()中是一个循环,就是循环去tryAquire()获取资源,方法返回值就是这个interrupted, 如果是true代表线程已经被暂停了,如果是false代表没有被暂停。

然后注意到还有

finally{

if(failed)

cancelAcquire(node);

   }

这部分代码,表示最终如果都没能获取这个资源的话,就执行cancelAcquire()方法:

private voidcancelAcquire(Node node) {

// 如果节点是null的话就忽略

   if(node ==null)

return;

   node.thread=null;

   // 跳过已经是cancel状态的前置节点

   Node pred = node.prev;

    while(pred.waitStatus>0)

node.prev= pred = pred.prev;

   Node predNext = pred.next;

   node.waitStatus= Node.CANCELLED;

   if(node ==tail&& compareAndSetTail(node,pred)) {

compareAndSetNext(pred,predNext, null);

   }else{

       intws;

        if(pred !=head&&

((ws = pred.waitStatus) == Node.SIGNAL||

(ws <=0&&compareAndSetWaitStatus(pred,ws,Node.SIGNAL))) &&

pred.thread!=null) {

Node next = node.next;

            if(next !=null&& next.waitStatus<=0)

compareAndSetNext(pred,predNext,next);

       }else{

unparkSuccessor(node);

       }

node.next= node;// 帮助GC

   }

}

哎哟那么多行代码看得脑壳疼,看看它都做了什么,一句话概括,它完了,它会把它自己从这个Node链中移除,让自己等着被GC回收,大家可以看到这里面调用了两次compareAndSetNext()方法:

private static final booleancompareAndSetNext(Node node,

                                              Node expect,

                                              Node update) {

returnunsafe.compareAndSwapObject(node,nextOffset,expect,update);

}

其实就是调用Unsafe的方法通过CAS的方式来设置Node的next节点,就是把当前节点的前面节点指向当前节点的后面节点:

这里注意还有一个方法unparkSuccessor(),这个方法激活它后面的节点:

private voidunparkSuccessor(Node node) {

   intws = node.waitStatus;

    if(ws <0)

compareAndSetWaitStatus(node,ws,0);

   Node s = node.next;

    if(s ==null|| s.waitStatus>0) {

s =null;

        for(Node t =tail;t !=null&& t != node;t = t.prev)

if(t.waitStatus<=0)

s = t;

   }

if(s !=null)

LockSupport.unpark(s.thread);

}

可以看到,如果当前节点的状态不是CANCELLED,那就设置当前的状态为0,然后下一步,操作它的下一个节点,前提是这个节点不为null并且它的状态不为CANCELLED, 如果为null或者CANCELLED,又要进行下一波操作,将Node由尾向前遍历,找到最靠前的那个不为null并且不是当前节点本身并且状态不为CANCELLED的节点,这个由后向前遍历的操作是有点骚的啊,大家体会一下,因为它的next都为null啦,所以没法从next开始遍历,于是往前遍历了。

言归正传,找到下一个满足条件的后节点后,就会通过LockSupport.unpark(s.thread)方法将其唤醒~也就是轮到下个节点的线程进行工作的时候啦。

走到了这里,我们就可以回到acquire()方法啦, 我们是不是沿着acquire()的脚步走了一圈?咦,漏了addWaiter(Node.EXCLUSIVE)这个方法:

privateNodeaddWaiter(Node mode) {

Node node =newNode(Thread.currentThread(),mode);

   Node pred =tail;

    if(pred !=null) {

node.prev= pred;

        if(compareAndSetTail(pred,node)) {

pred.next= node;

            returnnode;

       }

    }

enq(node);

    returnnode;

}

这个方法返回的值是一个Node节点,这个返回值是作为acquireQueued()参数传入的,它接受的参数也是个Node,我们来看看这个方法的输入和输出有啥区别~

简而言之当这个AQS实例的tail这个节点不为空时,它会将参数这个节点加入到AQS实例的Node链表中(加到末尾),你可以看到通过compareAndSetTail()方法(我不再细讲这个方法了哈,其实就是调用的Unsafe的方法,本质是CAS来保证原子性)来重新设置了AQS实例的尾节点,并返回这个节点。

如果这个AQS实例的tail是空的话,就会执行一个方法enq(node):

privateNodeenq(finalNode node) {

for(;;) {

Node t =tail;

        if(t ==null) {// Must initialize

           if(compareAndSetHead(newNode()))

tail=head;

       }else{

node.prev= t;

            if(compareAndSetTail(t,node)) {

t.next= node;

                returnt;

           }

        }

    }

}

可以看到这个方法里有一个循环,在这里面又会去取到AQS实例的tail节点,再去判断它是否为空,如果是空的,新建一个Node节点赋给头结点,再把头节点赋给尾节点,其实就是初始化head和tail,然后tail就不为null了嘛,就会走进else这个分支,所以这里的for(;;)操作也很骚气~它会一直等到tail不是null的时候return。else做的操作就很浅显易懂了,就是讲要添加的node节点赋给tail作为整个链表的尾节点然后返回的就是当前节点的前一个节点(但这里调用enq()时并没有接这个返回值,只是调用而已)。

然后是acquire()方法中的最后一个方法selfInterrupt():

static voidselfInterrupt() {

Thread.currentThread().interrupt();

}

其实就是设置中断的状态为true,因为acquire里有个判断acquireQueued(addWaiter(Node.EXCLUSIVE), arg),这里返回的是线程是否被中断了,如果被中断了才执行selfInterrupt,目的是将这个中断状态传递出去,关于线程的中断机制,我理解了老半天,感觉网上搜到文章都不太地道,所以我决定写一篇文章专门讲这个,敬请期待~

那么独占模式下AQS获取资源的整个流程就走完了,是不是还晕晕的,是的!正常!那我们来总结一下流程(从http://www.javarticles.com/2012/10/abstractqueuedsynchronizer-aqs.html#prettyPhoto盗的一张图~懒得自己画了,谅解一下~):

一句话总结就是:线程会去尝试独占资源,如果成功就是将threadowner设为它自己然后接着做想做的事情,如果不成功就将这个线程放入一个先入先出的队列等待,这是站在线程的角度看,它被放进了一个队列,站在队列的角度看,它还会让队列里满足条件的线程去持续尝试独占资源,如果占成功了那就行了呗,没有的话就看要不要把线程给暂停了,如果暂停了就等待被唤醒啦,如果没暂停的就再尝试占有资源,一直就这样~(这一句话貌似有点长=。=)

好~获取资源部分终于差不多了,获取了之后总要释放呗,下面就讲独占资源的释放~

资源的释放呢是调用的release方法:

public final booleanrelease(intarg) {

if(tryRelease(arg)) {

Node h =head;

        if(h !=null&& h.waitStatus!=0)

unparkSuccessor(h);

        return true;

   }

return false;

}

其中tryRelease(arg)这个就跟tryAcquire()一样是由继承它的子类实现的(如果想看例子请看ReentrantLock中的tryRelease方法),想想,这个方法也是当前线程来执行的对吧,那就意味着,当前线程此刻是运行的状态,这个就会通过unparkSuccessor(h)方法唤醒头节点的下一个节点,这个unparkSuccessor(h)方法前面已经分析过了,反正就是唤醒下个节点。

然后释放的工作就结束啦~

说完独占模式后该说共享模式啦~期待得搓手手~

第二部分 共享模式

还是按照套路,将获取资源的方法走一遍~

共享模式获取资源调用的是acquireShared方法:

public final voidacquireShared(intarg) {

if(tryAcquireShared(arg) <0)

doAcquireShared(arg);

}

tryAcquireShared大家应该能猜到是由子类实现了,也就是我们自己写实现,反正只要它小于0,我们实现的时候就要注意了,小于0在这里就要代表没有获取成功,这样就会执行下面的操作 doAcquireShared(arg):

private voiddoAcquireShared(intarg) {

finalNode node = addWaiter(Node.SHARED);

    booleanfailed =true;

    try{

booleaninterrupted =false;

        for(;;) {

finalNode p = node.predecessor();

            if(p ==head) {

intr = tryAcquireShared(arg);

                if(r >=0) {

setHeadAndPropagate(node,r);

                   p.next=null;// help GC

                   if(interrupted)

selfInterrupt();

                   failed =false;

                    return;

               }

            }

if(shouldParkAfterFailedAcquire(p,node) &&

                parkAndCheckInterrupt())

interrupted =true;

       }

}finally{

if(failed)

cancelAcquire(node);

   }

}

首先同样会执行addWaiter(Node.SHARED),这个方法之前讲了嘛,独占模式也有,就是讲当前线程所在节点加入到等待队列里面,唯一不同的是这里设置的nextWaiter是Node.SHARED而已。然后进入循环,如果当前节点的前置节点是头节点的话,会去接着尝试tryAcquireShared(arg)试图获取资源,如果成功,则执行setHeadAndPropagate方法:

private voidsetHeadAndPropagate(Node node, intpropagate) {

Node h =head;// 保存旧的head用于下面检查

   setHead(node);

   if(propagate >0|| h ==null|| h.waitStatus<0||

(h =head) ==null|| h.waitStatus<0) {

Node s = node.next;

        if(s ==null|| s.isShared())

doReleaseShared();

   }

}

它这个方法里有一串判断条件:propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0

翻译:以下条件满足其一:

          1. propagate也就是tryAcquireShared方法的返回值要大于0,也就是要获取资源成功

      2. 之前的头节点是空

      3. 之前的头节点的等待状态不是CANCELLED

      4. 当前头节点为空

      5. 当前头节点等待状态不是CANCELLED

好,满足这几个要求中的一个就能走到if里面去啦,看看它做了什么呢?

首先找到了当前节点的下一个节点,如果它是空的或者它的模式是共享模式,那么执行doReleaseShared():

private voiddoReleaseShared() {

   for(;;) {

Node h =head;

        if(h !=null&& h !=tail) {

intws = h.waitStatus;

            if(ws == Node.SIGNAL) {

if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))

continue;            

               unparkSuccessor(h);

           }

else if(ws ==0&&

!compareAndSetWaitStatus(h,0,Node.PROPAGATE))

continue;               // CAS失败后继续循环

       }

if(h ==head)// 如果头节点不一样了则循环继续,否则跳出循环

           break;

   }

}

哇哦,又是一个循环,我们可以看到,跳出循环的条件是h == head, 然鹅,h = head,那么我们就有两个猜想,第一,这个head可能是在多线程的环境下发生了变化,不然在单线程的条件下,h == head是永远成立的, 第二,在上面这段:

 if(h !=null&& h !=tail) {

       int ws = h.waitStatus;

       if(ws == Node.SIGNAL) {

if(!compareAndSetWaitStatus(h,Node.SIGNAL,0))

               continue;            

               unparkSuccessor(h);

           }

else if(ws ==0&&!compareAndSetWaitStatus(h,0,Node.PROPAGATE))

            continue;               // CAS失败后继续循环

       }

操作中可能有对head的修改,那么我们先来看看这段代码有没有修改head。

当h不是空并且也不是最后一个节点的情况下,我们拿到他的等待状态,如果是SIGNAL(代表了它后面的节点需要被唤醒),然后我们就去将它的等待状态设为0(初始状态,表示后面没有节点等待被唤醒),并unparkSuccessor(唤醒后面的节点),这个方面前面也讲了,记不住翻上去看,然后如果节点的等待状态是0的话就回去设置它的状态是Node.PROPAGATE,但是这个方法也没有改变过head节点。

那么就意味着我们第一个猜想是正确的,head可能在多个线程的执行过程中发生了变化,那么head没变就代表了已经没有多个线程在这同时执行这段代码了(多个线程试图获取资源),也就可以退出循环了。

跳出之后,我们可以回到doAcquireShared方法了,我们已经唤醒了后面的共享模式的节点,然后我们就把p(头节点从等待队列中删除)

if(shouldParkAfterFailedAcquire(p,node) &&

                parkAndCheckInterrupt())

    interrupted =true;

这段代码很熟悉了,独占模式下也有,就是判断当获取资源失败时是否将线程中断,并检查线程是否被中断,赋值给interrupted。

如果是被中断了的话,就执行selfInterrupt方法重置线程的中断标志为true,然后就可以跳出循环了。反正不管跳不跳出循环,最终是要执行下面这段代码的:

finally{

if(failed)

cancelAcquire(node);

}

cancelAcquire()这个方法我们前面也讲了,就是将当前线程给取消掉,一定条件下去唤醒后面的节点。然后就完事~是不是很简单,过程基本上和独占的差不多,只是它会执行setHeadAndPropagate这个方法,也就是如果它的下个节点也是SHARED模式的话,它会将它的下个节点也唤醒。

好的大概讲一下流程,白话讲哈,比较容易理解,就是线程首先会去获取资源嘛,如果获取到了就占有它,如果没获取到就加入等待队列的末尾嘛,然后队列中的线程都会去判断一下,它前面那个节点是不是头节点,因为头节点就是占有着资源的那个节点嘛,头节点完了就可以轮到它了,所以它就不挂起自己,接着请求,重复请求等着前面节点释放资源嘛,释放了资源它就紧接着占有资源啦,占有后,共享模式会把自己线程的节点设为头节点了嘛,之前的头已经完成移出队列了,然后现在就它自己是头啦,它会去检查自己后面那个节点是不是也是SHARED共享模式的,如果是,它就把它唤醒!当然也有线程挂起的时候啦,比如它前面的节点并不是头节点的时候,那么这个时候要把它前面那个节点的状态设为Node.SIGNAL,这样子当他前面节点被唤醒,执行完成要将资源交出去的时候才会去唤醒它后面的节点,不然的话后面的节点就不能够被唤醒哦,所以Node.SIGNAL很重要啦~

整个过程大概就是这样子的,共享模式的释放也很简单跟独占模式类似所以也不讲啦~

呼~好累,但是还是要多说两句,这个AbstractQueuedSynchronizer所有方法我们都要考虑到是很多个线程同时在执行的,虽然有的阻塞,有的在执行,时刻这样想着有助于我们理解这个代码的一些设计~

最后就是,我也是自己试图去把自己的理解写出来的,有什么问题和建议都请留言,批评就算了,写这个很累的,况且仙女不接受批评~

相关文章

网友评论

      本文标题:AbstractQueuedSynchronizer源码解析

      本文链接:https://www.haomeiwen.com/subject/iikbaftx.html