美文网首页
五、Lock接口和AQS

五、Lock接口和AQS

作者: 沉沦2014 | 来源:发表于2018-12-14 11:49 被阅读21次

https://www.cnblogs.com/daydaynobug/p/6752837.html
https://blog.csdn.net/zhangdong2012/article/details/79983404
https://blog.csdn.net/zy1994hyq/article/details/83656109

5.1 Lock接口

它提供了与synchronized关键字类似的同步功 能,只是在使用时需要显式地获取和释放锁。虽然它缺少了(通过synchronized块或者方法所提供的)隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性可中断的获取锁以及超时获取锁等多种synchronized关键字所不具备的同步特性。

特性 描述
非阻塞地获取锁 当前线程尝试获取锁,如果这时刻没有被其他线程获取到,则成功获取锁
能被中断地获取锁 与synchronize不同,获取到锁的线程能够响应中断,当获取到锁地线程被中断时,中断异常将会被抛出,同时锁将会被释放
超时获取锁 指定地截至时间之前获取锁,若截至时间到了,无法获取
public interface Lock {
    //获取锁,调用该方法将会获取锁,当锁获取后,从该方法返回
    void lock();
    //可中断地获取锁,和lock()方法的不同之处在于该方法会响应中断,即在锁的获取过程中可以中断当前线程 
    void lockInterruptibly() throws InterruptedException;
    //尝试非阻塞的获取锁,调用该方法后会立刻返回,如果能够获取则返回true,否则返回false 
    boolean tryLock();
    //超时地获取锁 1、当前线程在超时时间内成功获取锁。2、当前线程在超时时间内被中断。3、超时时间结束返回false。 
    boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
    //释放锁 
    void unlock();
    //获取等待通知组件
    Condition newCondition();
}

5.1.1 可重入锁ReentrantLock

一般会用可重入锁(ReentrantLock)向上转型实例化一个Lock锁:

Lock lock=new ReentrantLock( );
try
{
    lock.lock( );
   //以下代码只有一个先从可以运行
    ...
}
finally
{
    lock.unlock( );
}
lock必须调用unlock()方法释放锁,因
此在finally块中释放锁。

Reentrantlock(可重入锁)如何实现一个锁?

public ReentrantLock() {
        sync = new NonfairSync();
    }
public class ReentrantLock implements Lock, java.io.Serializable
{
 
    private final Sync sync;
    abstract static class Sync extends AbstractQueuedSynchronizer 
    {}
}   


static final class NonfairSync extends Sync{}

Reentrantlock(可重入锁)中所有方法实际上都是调用了其静态内部类Sync中的方法,而Sync继承了AbstractQueuedSynchronizer(AQS --简称同步器)

5.2 队列同步器(AQS)

同步器内部其实就是用双向链表来存储线程,关于锁的相关操作都可以通过操作这个链表实现。

同步器依赖内部的同步队列(FIFO)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成一个节点并将其加入到同步队列,同时阻塞当前线程。首节点是获取同步状态成功的节点,同步状态释放时,首节点会唤醒后继节点中的线程,并可能让其获取同步状态。

同步器拥有的头节点(head)、尾节点(tail), 还有一个线程同步队列,以双向链表的形式体现。

线程在尝试获取同步状态时,AQS会把该线程构造成一个节点,节点结构即上文中的Node类,然后让内部的tail节点(末节点)引用该节点,此阶段是通过compareAndSetTail方法利用CAS原理设置tail指向该节点的引用。
获取到同步的线程,AQS中的head节点会指向包含该线程的节点,执行完相应的逻辑后,会释放同步状态。然后首节点会唤醒它的后继节点(next引用)并让该节点中的线程参与获取同步状态的活动。

事实上显示锁(ReentrantLock)、信号量(Semaphore)、障碍器(CountDownLatch)的实现,都是采用模板方法模式实现的,AbstractQueuedSynchronizer就是模板类。

下面就是具体实现类需要实现的方法:

boolean tryAcquire(int arg) 试获取独占锁
boolean tryRelease(int arg) 试释放独占锁
int tryAcquireShared(int arg)   试获取共享锁
boolean tryReleaseShared(int arg)   试释放共享锁
boolean isHeldExclusively() 当前线程是否获得了独占锁

下面是在模板类AQS中已经实现的具体方法:

void acquire(int arg)   获取独占锁。会调用tryAcquire方法,如果未获取成功,则会进入同步队列等待
void acquireInterruptibly(int arg)  响应中断版本的acquire,其实这个方法与acquire方法的区别主要就是,这个方法抛出了中断异常
boolean tryAcquireNanos(int arg,long nanos) 超时+响应中断版本的acquire,这个方法主要除了会抛出异常,还加入了超时的判断
void acquireShared(int arg) 获取共享锁。会调用tryAcquireShared方法
void acquireSharedInterruptibly(int arg)    响应中断版本的acquireShared
boolean tryAcquireSharedNanos(int arg,long nanos)   响应中断+带超时版本的acquireShared
boolean release(int arg)    释放独占锁
boolean releaseShared(int arg)  释放共享锁
Collection getQueuedThreads()   获取同步队列上的线程集合

在解析上诉方法源码之前,还需要看一下Java中关于线程中断方法的解释:

interrupt()的作用是中断本线程。
本线程中断自己是被允许的;其它线程调用本线程的interrupt()方法时,会通过checkAccess()检查权限。这有可能抛出SecurityException异常。
如果本线程是处于阻塞状态:调用线程的wait(), wait(long)或wait(long, int)会让它进入等待(阻塞)状态,或者调用线程的join(), join(long), join(long, int), sleep(long), sleep(long, int)也会让它进入阻塞状态。若线程在阻塞状态时,调用了它的interrupt()方法,那么它的“中断状态”会被清除并且会收到一个InterruptedException异常。例如,线程通过wait()进入阻塞状态,此时通过interrupt()中断该线程;调用interrupt()会立即将线程的中断标记设为“true”,但是由于线程处于阻塞状态,所以该“中断标记”会立即被清除为“false”,同时,会产生一个InterruptedException的异常。

由于在源码中很多地方都用到了CAS算法,这里就简单解释一下:

CAS算法有3个操作数,内存值V,旧的预期值A,要修改的新值B。当且仅当预期值A和内存值V相同时,将内存值V修改为B,否则返回V。这个内存值就是根据valueOffset得到的内存地址里的值。
compareAndSwapObject(Object var1, long var2, Object var3, Object var4)
var1 操作的对象
var2 内存地址
var3 根据var2得到对应内存地址里的值,将该内存值的与var3比较,相等才更新
var4 更新值
更新成功 返回true, 反正返回false

AQS应用

class Mutex implements Lock {
    // 静态内部类,自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
      // 是否处于占用状态
      protected boolean isHeldExclusively() {
          return getState() == 1;
      }
      // 当状态为0的时候获取锁
      public boolean tryAcquire(int acquires) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
        }
      // 释放锁,将状态设置为0
      protected boolean tryRelease(int releases) {
          if (getState() == 0) throw new IllegalMonitorStateException();
          setExclusiveOwnerThread(null);
          setState(0);
          return true;
      }
      // 返回一个Condition,每个condition都包含了一个condition队列
      Condition newCondition() { return new ConditionObject(); }
    }
    // 仅需要将操作代理到Sync上即可
    private final Sync sync = new Sync();

    public void lock() { sync.acquire(1); }

    public boolean tryLock() { return sync.tryAcquire(1); }

    public void unlock() { sync.release(1); }

    public Condition newCondition() { return sync.newCondition(); }

    public boolean isLocked() { return sync.isHeldExclusively(); }

    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }

    public void lockInterruptibly() throws InterruptedException {
      sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

5.2.1 AQS具体方法acquire(int)源码解析

acquire(int)

此方法是独占模式下线程获取共享资源的顶层入口。如果获取到资源,线程直接返回,否则进入等待队列,直到获取到资源为止,且整个过程忽略中断的影响。这也正是lock()的语义,当然不仅仅只限于lock()。获取到资源后,线程就可以去执行其临界区代码了。下面是acquire()的源码:

public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
 }

函数流程如下:

  • tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  • addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  • acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  • 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

tryAcquire(int)

此方法尝试去获取独占资源。如果获取成功,则直接返回true,否则直接返回false。这也正是tryLock()的语义,还是那句话,当然不仅仅只限于tryLock()。如下是tryAcquire()的源码:

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

AQS这里只定义了一个接口,具体资源的获取交由自定义同步器去实现了(通过state的get/set/CAS)!!!至于能不能重入,能不能加塞,那就看具体的自定义同步器怎么去设计了!!!当然,自定义同步器在进行资源访问时要考虑线程安全的影响。

addWaiter(Node)

此方法用于将当前线程加入到等待队列的队尾,并返回当前线程所在的结点

private Node addWaiter(Node mode) {
    //以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
    Node node = new Node(Thread.currentThread(), mode);
    
    //尝试快速方式直接放到队尾。
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    
    //上一步失败则通过enq入队。
    enq(node);
    return node;
}

enq(Node)

private Node enq(final Node node) {
    //CAS"自旋",直到成功加入队尾
    for (;;) {
        Node t = tail;
        if (t == null) { // 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它。
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {//正常流程,放入队尾
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
....
//CAS算法设置头结点,判断头节点是否为空,如果为空设置头结点为 update
private final boolean compareAndSetHead(Node update) {
     return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
//CAS算法设置末节点,判断末节点是否等于expect,如果为空设置头结点为 update
private final boolean compareAndSetTail(Node expect, Node update) {
     return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
//CAS算法设置状态值,如果当前状态值等于预期值,则原子性地(避免多线程引起并发问题)将同步状态设置为给定的更新值。这个操作具有{volatile}读写的内存语义。
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

acquireQueued(Node, int)

OK,通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入等待队列尾部了。接下来该线程进入等待状态休息,直到其他线程彻底释放资源后唤醒自己,自己再拿到资源,然后就可以去干自己想干的事了。和医院排队拿号有点相似:在等待队列中排队拿号(中间没其它事干可以休息),直到拿到号后再返回。源码:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;//标记是否成功拿到资源
    try {
        boolean interrupted = false;//标记等待过程中是否被中断过
        
        //又是一个“自旋”!
        for (;;) {
            final Node p = node.predecessor();//拿到前驱
            //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)。
            if (p == head && tryAcquire(arg)) {
                setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null。
                p.next = null; // setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
                failed = false;
                return interrupted;//返回等待过程中是否被中断过
            }
            
            //如果自己可以休息了,就进入waiting状态,直到被unpark()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
        }
    } finally {
        //经过上面的判断,如果是正常跳出for循环的,那么failed最后一定为false
        //因此这里failed为true的情况只有可能是上面的代码发生异常了导致没有for循环没有正常结束
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire(Node, Node)

    //获取锁失败后,是否应该进行阻塞线程的操作
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //如果前置节点的状态为SIGANAL,则后面的节点应该阻塞,直到前置节点释放了同步状态时才唤醒后继节点,因此这里返回true
        if (ws == Node.SIGNAL) 
            return true;
        if (ws > 0) {
            //如果ws>=0,也就是pred的状态为CANCELLED,那么从pred开始向前搜索,
            //移除CANCELLED状态的节点,直到遇见状态waitState<=0的节点
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            //如果前置节点pred的状态为0或者PROPAGATE时,设置pred节点的状态为SIGNAL
            //独占模式下前置节点pred的状态为0
            //共享模式下前置节点pred的状态为0或者PROPAGATE
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

“shouldParkAfterFailedAcquire(p , node)”,这个方法内部会判定前一个节点的状态是否为:“Node.SIGNAL”,若是则返回true,若不是都会返回false,不过会再做一些操作:判定节点的状态是否大于0,若大于0则认为被“CANCELLED”掉了(我们没有说明几个状态的值,不过大于0的只可能被CANCELLED的状态),因此会从前一个节点开始逐步循环找到一个没有被“CANCELLED”节点,然后与这个节点的next、prev的引用相互指向;如果前一个节点的状态不是大于0的,则通过CAS尝试将状态修改为“Node.SIGNAL”,自然的如果下一轮循环的时候会返回值应该会返回true。

    static final class Node {
        //共享类型节点,表示在共享模式下的等待节点
        static final Node SHARED = new Node();
        //独占类型节点,表示在独占模式下的等待节点
        static final Node EXCLUSIVE = null;
 
        //等待状态,取消,表示该节点存放的线程被取消,CANCELLED状态的节点应该被移除节点链表
        static final int CANCELLED =  1;
        //等待状态,发信号(通知),当前节点为SIGNAL状态时,表示后继节点应该是阻塞的(park),当前节点释放同步状态后会通知后继节点,唤醒后继节点阻塞的线程
        static final int SIGNAL = -1;
        //等待状态值,表示当前节点在等待condition,也就是在condition队列(条件队列也叫等待队列)中
        static final int CONDITION = -2;
        //等待状态值,传播。使用在共享模式中的一种特殊状态,表示无条件向后传播唤醒动作,下一次共享式获取状态的操作应该无条件的传播
        static final int PROPAGATE = -3;
        //当前的等待状态,取值为5中,包括上面定义的CANCELLED、SIGNAL、CONDITION、PROPAGATE,除此之外还有  0 :表示除上面四种之外的情况
        //总的来说状态可以简单分为就3种:取消、阻塞、执行
        //CANCELLED 取消,SIGNAL、CONDITION、PROPAGATE 独占阻塞/条件阻塞/共享阻塞
        volatile int waitStatus;//int类型默认情况下为0,这里可以表示节点初始状态或者正在执行
 
        //当前节点的前一个节点
        volatile Node prev;
        //当前节点的后一个节点,注意与nextWaiter区分,nextWaiter是用于condition队列中后继节点的指向
        volatile Node next;
        //当前节点存放的线程
        volatile Thread thread;
 
        //使用在condition队列中的后继节点。
        Node nextWaiter;
 
        //如果节点实在共享模式中 返回true
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        
        Node() {}
        //提供给addWaiter方法使用,生成同步队列的节点
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        //生成condition队列(条件队列也叫等待队列)的节点
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
 
    }

parkAndCheckInterrupt()

如果线程前驱节点的waitStatus为Node.SIGNAL,那就可以安心去休息了。此方法就是让线程去休息,真正进入等待状态。

    //根据当前对象的this引用,阻塞当前线程,并且返回当前线程是否中断
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); //调用park()使线程进入waiting状态(线程如果因为调用park而阻塞的话,能够响应中断请求(中断状态被设置成true),但是不会抛出InterruptedException)
        return Thread.interrupted(); //如果被唤醒,查看自己是不是被中断的
    }

park()会让当前线程进入waiting状态。在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。需要注意的是,Thread.interrupted()会清除当前线程的中断标记位。

acquireQueued()分析完之后,我们接下来再回到acquire(),再贴上它的源码:

public final void acquire(int arg) {
     if (!tryAcquire(arg) &&
         acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
         selfInterrupt();
 }

再来总结下它的流程:

  1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
  2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
  3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

由于此函数是重中之重,再用流程图总结一下:


至此,acquire()的流程终于算是告一段落了。这也就是ReentrantLock.lock()的流程,不信你去看其lock()源码吧,整个函数就是一条acquire(1)!!!

5.2.2 AQS具体方法release(int)源码解析

此方法是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。这也正是unlock()的语义,当然不仅仅只限于unlock()。下面是release()的源码:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;//找到头结点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);//唤醒等待队列里的下一个线程
        return true;
    }
    return false;
}

逻辑并不复杂。它调用tryRelease()来释放资源。有一点需要注意的是,它是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自定义同步器在设计tryRelease()的时候要明确这一点!!

tryRelease(int)

此方法尝试去释放指定量的资源。下面是tryRelease()的源码:

protected boolean tryRelease(int arg) {
     throw new UnsupportedOperationException();
}

跟tryAcquire()一样,这个方法是需要独占模式的自定义同步器去实现的。正常来说,tryRelease()都会成功的,因为这是独占模式,该线程来释放资源,那么它肯定已经拿到独占资源了,直接减掉相应量的资源即可(state-=arg),也不需要考虑线程安全的问题。但要注意它的返回值,上面已经提到了,release()是根据tryRelease()的返回值来判断该线程是否已经完成释放掉资源了!所以自义定同步器在实现时,如果已经彻底释放资源(state=0),要返回true,否则返回false。

unparkSuccessor(Node)

此方法用于唤醒等待队列中下一个线程。下面是源码:

private void unparkSuccessor(Node node) {
    //这里,node一般为当前线程所在的结点。
    int ws = node.waitStatus;
    if (ws < 0)//置零当前线程所在的结点状态,允许失败。
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next;//找到下一个需要唤醒的结点s
    if (s == null || s.waitStatus > 0) {//如果为空或已取消
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev) // 如果node的后继节点为空或者状态为CANCELLED,则从末节点开始反向查询可以唤醒的节点
            if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);//唤醒
}

这个函数并不复杂。一句话概括:用unpark()唤醒等待队列中最前边的那个未放弃线程,这里我们也用s来表示吧。此时,再和acquireQueued()联系起来,s被唤醒后,进入if (p == head && tryAcquire(arg))的判断(即使p!=head也没关系,它会再进入shouldParkAfterFailedAcquire()寻找一个安全点。这里既然s已经是等待队列中最前边的那个未放弃线程了,那么通过shouldParkAfterFailedAcquire()的调整,s也必然会跑到head的next结点,下一次自旋p==head就成立了),然后s把自己设置成head标杆结点,表示自己已经获取到资源了,acquire()也返回了!!

5.2.3 AQS具体方法acquireShared(int)源码解析

此方法是共享模式下线程获取共享资源的顶层入口。它会获取指定量的资源,获取成功则直接返回,获取失败则进入等待队列,直到获取到资源为止,整个过程忽略中断。下面是acquireShared()的源码:

public final void acquireShared(int arg) {
     if (tryAcquireShared(arg) < 0)
         doAcquireShared(arg);
}

这里tryAcquireShared()依然需要自定义同步器去实现。但是AQS已经把其返回值的语义定义好了:负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:

  1. tryAcquireShared()尝试获取资源,成功则直接返回;
  2. 失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回。

doAcquireShared(int)

此方法用于将当前线程加入等待队列尾部休息,直到其他线程释放资源唤醒自己,自己成功拿到相应量的资源后才返回。下面是doAcquireShared()的源码:

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//加入队列尾部
    boolean failed = true;//是否成功标志
    try {
        boolean interrupted = false;//等待过程中是否被中断过的标志
        for (;;) {
            final Node p = node.predecessor();//前驱
            if (p == head) {//如果到head的下一个,因为head是拿到资源的线程,此时node被唤醒,很可能是head用完资源来唤醒自己的
                int r = tryAcquireShared(arg);//尝试获取资源
                if (r >= 0) {//成功
                    setHeadAndPropagate(node, r);//将head指向自己,还有剩余资源可以再唤醒之后的线程
                    p.next = null; // help GC
                    if (interrupted)//如果等待过程中被打断过,此时将中断补上。
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            
            //判断状态,寻找安全点,进入waiting状态,等着被unpark()或interrupt()
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

有木有觉得跟acquireQueued()很相似?对,其实流程并没有太大区别。只不过这里将补中断的selfInterrupt()放到doAcquireShared()里了,而独占模式是放到acquireQueued()之外,其实都一样,不知道Doug Lea是怎么想的。

跟独占模式比,还有一点需要注意的是,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。因为老大先唤醒老二,老二一看资源不够自己用继续park(),也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。

setHeadAndPropagate(Node, int)

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; 
    setHead(node);//head指向自己
     //如果还有剩余量,继续唤醒下一个邻居线程
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

此方法在setHead()的基础上多了一步,就是自己苏醒的同时,如果条件符合(比如还有剩余资源),还会去唤醒后继结点,毕竟是共享模式!
doReleaseShared()我们留着下一小节的releaseShared()里来讲。

至此,acquireShared()也要告一段落了。让我们再梳理一下它的流程:
tryAcquireShared()尝试获取资源,成功则直接返回;
失败则通过doAcquireShared()进入等待队列park(),直到被unpark()/interrupt()并成功获取到资源才返回。整个等待过程也是忽略中断的。
其实跟acquire()的流程大同小异,只不过多了个自己拿到资源后,还会去唤醒后继队友的操作(这才是共享嘛)。

5.2.4 AQS具体方法releaseShared()源码解析

上一小节已经把acquireShared()说完了,这一小节就来讲讲它的反操作releaseShared()吧。此方法是共享模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源。下面是releaseShared()的源码:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//尝试释放资源
        doReleaseShared();//唤醒后继结点
        return true;
    }
    return false;
}

此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于可重入的考量;而共享模式下的releaseShared()则没有这种要求,一是共享的实质--多线程可并发执行;二是共享模式基本也不会重入吧(至少我还没见过),所以自定义同步器可以根据需要决定返回值。

doReleaseShared()

此方法主要用于唤醒后继。下面是它的源码:

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;
                unparkSuccessor(h);//唤醒后继
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;
        }
        if (h == head)// head发生变化
            break;
    }
}

相关文章

网友评论

      本文标题:五、Lock接口和AQS

      本文链接:https://www.haomeiwen.com/subject/bkephqtx.html