美文网首页
【Java并发】同步器-AQS

【Java并发】同步器-AQS

作者: Rockie_h | 来源:发表于2018-04-10 00:12 被阅读43次
    AQS.png

    AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getState,setState以及compareAndSetState等protected方法来操作。这个整数可以用来表示任意状态。比如,ReentrantLock用它来表示所有者线程已经重复获取该锁的次数,Semaphore用它来表示剩余的许可数量,FutureTask用它来表示任务的状态(尚未开始、正在运行已完成以及已取消)。
    如果某个同步器支持独占的获取操作,那么需要实现一些保护方法,包括tryAcquire、tryRelease和isHeldExclusively等,而对于支持共享获取的同步器,则应该实现tryAcquireShared和tryReleaseShared等方法。AQS中的acquire、acquireShared、release和releaseShared等方法都将调用这些方法在子类中带有前缀try的版本来判断某个操作是否能执行。
    AQS中获取操作和释放操作的标准形式:首先,判断当前状态是否允许获得操作;其次,更新同步器的状态。

    boolean acquire() throws InterruptedException
    {
        while(当前状态不允许获取操作)
        {
            if(需要阻塞获取请求)
            {
                 如果当前线程不在队列中,则将其插入队列
                 阻塞当前线程
            }
            else
            {
                返回失败
            }
        }
        可能更新同步器的状态
        如果线程位于队列中,则将其移除队列
        返回成功
    }
    void release()
    {
        更新同步器的状态
        if(新的状态允许某个被阻塞的线程获取成功)
        {
            解除队列中一个或多个线程的阻塞状态
        }
    }
    

    示例一:使用AbstractQueuedSynchronizer实现的二元闭锁:

    public class OneShotLatch 
    {
        private final Sync sync = new Sync();
        
        public void signal()
        {
            sync.releaseShared(0);
        }
        
        public void await() throws InterruptedException
        {
            sync.acquireSharedInterruptibly(0);
        }
        
        private class Sync extends AbstractQueuedSynchronizer
        {
            protected int tryAcquireShared(int ignored)
            {
                //如果闭锁是开的(state == 1),这个操作成功,否则失败
                return (getState() == 1) ? 1 : -1;
            }
            
            protected boolean tryReleaseShared(int ignored)
            {
                //打开闭锁
                setState(1);
                //现在其他线程可以获取该闭锁
                return true;
            }
        }
    }
    

    在OneShotLatch中,AQS状态用来表示闭锁状态,关闭0,打开1。await方法调用AQS的acquireSharedInterruptibly,然后接着调用OneShotLatch中的tryAcquireShared方法。若之前已经打开了闭锁,则tryAcquireShared将返回成功并允许线程通过,否则返回一个表示获取操作失败的值。acquireSharedInterruptibly方法在处理失败的方式,是把这个线程放入等待线程队列中。
    java.util.concurrent中的所有同步器类都没有直接扩展AQS,而是都将他们的相应功能委托给私有的AQS子类来实现。如开篇的类图所示。

    ReentrantLock

    ReentrantLock只支持独占方式的获取操作,因此它实现了tryAcquire、tryRelease和isHeldExclusively。
    源码一:基于非公平的ReentrantLock实现tryAcquire

    当一个线程尝试获取锁时,tryAcquire首先检查锁的状态。若未被持有,它将尝试更新锁的状态以表示锁已经被持有;
    若锁状态表明它已经被持有,并且如果当前线程是锁的所有者,则获取计数递增,若不是当前持有者,则获取失败。
    final boolean nonfairTryAcquire(int acquires) 
    {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0)
                {
                    if (compareAndSetState(0, acquires)) {
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
                else if (current == getExclusiveOwnerThread())
                {
                    int nextc = c + acquires;
                    if (nextc < 0) // overflow
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
    }
    

    Semaphore与CountDownLatch

    Semaphore将AQS的同步状态用于保存当前可用许可的数量。
    CountDownLatch使用AQS的方式与Semaphore相似:在同步状态中保存的是当前的计数值,countDown方法调用release,从而减少计数值,并且当计数值为零时,解除所有等待线程的阻塞。await调用acquire,当计数器为零时,acquire将立即返回,否则将阻塞。
    源码二:Semaphore中的tryAcquireShared与tryReleaseShared:

    final int tryAcquireShared(int acquires) {
                for (;;) {
                    int available = getState();
                    int remaining = available - acquires;
                    if (remaining < 0 ||
                        compareAndSetState(available, remaining))
                        return remaining;
                }
            }
    protected final boolean tryReleaseShared(int releases) {
                for (;;) {
                    int current = getState();
                    int next = current + releases;
                    if (next < current) // overflow
                        throw new Error("Maximum permit count exceeded");
                    if (compareAndSetState(current, next))
                        return true;
                }
            }
    

    FutureTask

    Future.get的语义非常类似于闭锁的语义---如果发生了某个事件(由FutureTask表示的任务执行完成或被取消),那么线程就可以恢复执行,否则这些线程将停留在队列中并直到该事件发生。
    FutureTask中,AQS同步状态被用来保存任务的状态,例如正在运行、已完成或已取消。此外,它还维护了一个引用,指向正在执行计算任务的线程(如果它当前处于运行状态),因此,如果任务取消,该线程就会中断。

    ReentrantReadWriteLock

    ReadWriteLock接口表示存在两个锁:一个读锁,一个写锁。读锁上的操作使用共享的获取方法与释放方法,写锁上的操作将使用独占的获取方法和释放方法。
    AQS内部维护一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问。在ReentrantReadWriteLock中,当锁可用时,如果位于队列头部的线程执行写入操作,那么线程会得到这个锁,如果位于队列头部的线程执行读取访问,那么队列中在这个线程之前的所有线程都将获得这个锁。

    相关文章

      网友评论

          本文标题:【Java并发】同步器-AQS

          本文链接:https://www.haomeiwen.com/subject/qzkvhftx.html