美文网首页
Java 多线程(四)- 理解 AbstractQueuedSy

Java 多线程(四)- 理解 AbstractQueuedSy

作者: PFF | 来源:发表于2017-01-27 23:29 被阅读132次

    上文说到 ReentrantLock 用到代理模式,其中最核心的加锁 / 解锁操作都是调用 Sync 对象完成。而从源代码可以看出,Sync 对象代码量也不多,核心代码被封装在父类 AbstractQueuedSynchronizer(简称 AQS)中,今天就来简单的探究探究这个 AQS。

    AQS 框架

    实际编程时,大多数时候我们不会直接使用 AQS,ReentrantLock / CountDownLatch 等标准同步器能够满足绝大多数情况的需求。但能如果能了解 AQS 的实现方式,对于理解这些同步器类的工作原理有很大帮助。

    设计模式

    从方法调用顺序看,AQS 采用了标准的模版方法模式,对外放出以下公开方法:

    //独占模式接口 
    public final void acquire(int arg);
    public final void acquireInterruptibly(int arg)
            throws InterruptedException;
         public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException;
         public final boolean release(int arg);
            
    //共享模式接口 
    public final void acquireShared(int arg);
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException;
         public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException;
         public final boolean releaseShared(int arg);
    

    AQS 同时支持独占和共享模式,子类同步器不能重载这些对外接口,但是必须重载独占模式接口或者共享模式接口:

    //独占模式
    protected boolean tryAcquire(int arg);
    protected boolean tryRelease(int arg);
    
    //共享模式
    protected int tryAcquireShared(int arg);
    protected boolean tryReleaseShared(int arg);
    
    //是否是独占模式
    protected boolean isHeldExclusively();
    

    子类的 tryAcquire/tryAcquireShared 会在 AQS 对外 acquire 系列公开接口中调用,返回 true / 非0 表明当前线程获取执行“许可”;如果是 false / 0 表明当前线程未获得“执行”许可,需要被阻塞挂起,放入阻塞队列。

    类似的,子类的 tryRelease / tryReleaseShared 会在 AQS 对外 release 系列公开接口中调用,返回 true 表明释放成功,AQS 框架会在阻塞队列中运行其他线程。使用模式如下:

    Acquire:
        while (!tryAcquire(arg)) {
            enqueue thread if it is not already queued;
            possibly block current thread;
        }
    
     Release:
        if (tryRelease(arg))
            unblock the first queued thread;
    
    有趣的地方

    AQS 在定义 tryAcquire 这些需要子类实现的接口时,并不是用 abstract 关键字,而是抛出异常。究其原因,可能是如果使用 abstract 关键字,那子类必须实现所有独占模式和共享模式的接口。而实际中根据子类同步器的同步语义,一般只会使用其中一种模式,所以实现所有两种接口后会造成代码冗余,以及语义的晦涩难懂。

    结构功能

    得益于模版方法模式,子类同步器只需要关注让让线程是否获得执行“许可”,而线程的阻塞和入队都是用 AQS 控制。所以 AQS 最主要的工作是以下3件事:

    • 提供共享变量 state ,以及 CAS 操作供子类使用。state 语义由子类定义,子类使用 state 判断线程是否准入;
    • 阻塞线程。子类同步器禁止线程准入后,AQS 调用 LockSupport.park 将当前线程阻塞挂起(线程进入 TIMED_WAITING 状态)
    • 管理阻塞队列。改进型 CLH lock queue,实现阻塞线程的 FIFO,当准入许可释放的时候,调度下一个线程执行。

    阻塞队列

    AQS 的阻塞线程队列使用了改进型的 CLH lock queue(具体可以查看内容来源),保证阻塞线程 FIFO。简而言之:

    CLH lock queue其实就是一个FIFO的队列,队列中的每个结点(线程)只要等待其前继节点释放锁,或者说是根据前继节点的状态决定其是否可运行。
    
    Node

    说 AQS 的阻塞队列是改进型的 CLH lock queue,是因为队列中的 Node 不仅有前继节点引用,也包含后续节点引用,同时还有 status 描述多种状态,以便支持取消等待阻塞。以下是 Node 数据结构:

    字段 解释描述
    prev 指向前继节点
    next 指向后续节点
    thread 被阻塞线程
    nextWaiter 使用在 condition 等待队列中,或者共享节点
    status 节点状态
    1,CANCELLED,表示当前节点的线程因为超时或中断被取消了
    0,除了以上四种状态的第五种状态,一般是节点初始状态
    -1,SIGNAL,表示当前节点的后续节点中的线程通过park被阻塞了,当前节点在释放或取消时要通过unpark解除它的阻塞
    -2,CONDITION,表示当前节点在condition队列中
    -3,PROPAGATE,共享模式的头结点可能处于此状态,表示无条件往下传播,引入此状态是为了优化锁竞争,使队列中线程有序地一个一个唤醒
    队列

    ASQ 对象中有有两个成员变量,head 指向阻塞队列队首,tail 指向阻塞队列队尾。队列结构如下图所示:

    队列结构
    waitStatus

    waitStatus 表示的是后续节点状态,这是因为 AQS 中使用 CLH 队列实现线程的结构管理,而 CLH 结构正是用前一节点某一属性表示当前节点的状态,这样更容易实现取消和超时功能。

    head 和 next

    队列 head 指向队首,head指向的首节点 node 代表当前获取执行许可的线程,只有 next 引用。当前线程释放许可的时候,head 节点可以根据 next 快速找到下一个准入的线程。这是其对 CLH lock queue 的优化。如果后继节点为空或者 CANCELED, 则从队列 tail 处开始,向前找寻离它最近的非 CANCELED 节点,该节点的线程则是下一个获取准入的线程。

    Node 排队取消

    当某线程发生等待超时或者被 interrupt 时,对应 node 的状态被设置为 CANCELLED 。

    该节点的 next 引用指向本身,方便以后 gc 的时候,对象被快速回收。

    该节点的前继节点的 next 指向该节点的后续节点,当然前继节点的状态不是 CANCELLED

    原子操作

    head 和 tail 成员变量都采用 volatile 修饰符,保证线程可以及时访问到最新数据。

    入队,出队,修改 node 的 prev 和 next 指针,都采用了 CAS 操作,配合 while 或者 for 自旋,保证操作的原子性。

    编程场景

    一般来说我们不需要直接使用 AQS,如果需要创建自定义的同步器,可以参考 ReentrantLock,需要:

    1. 创建对应 Sync 继承 AQS,根据同步语义,按需重载 tryAcquire / tryRelease 系列接口,可以调用 AQS 的 getState,setState,compareAndSetState 接口;
    2. 创建同步器类,并包含 Sync,使用代理模式,调用 AQS 的 public 接口实现同步接口;

    一般不会让同步器类直接继承 AQS,这样可以防止同步器类暴露太多非必须的 AQS 接口。

    比如我们可以如下实现一个 Mutex:

    class Mutex implements Lock, java.io.Serializable {
    
        // Our internal helper class
        private static class Sync extends AbstractQueuedSynchronizer {
            // Reports whether in locked state
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
            
            // Acquires the lock if state is zero
            public boolean tryAcquire(int acquires) {
                assert acquires == 1; // Otherwise unused
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            // Releases the lock by setting state to zero
            protected boolean tryRelease(int releases) {
                assert releases == 1; // Otherwise unused
                if (getState() == 0) throw new IllegalMonitorStateException();
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
        
            // Provides a Condition
            Condition newCondition() { return new ConditionObject(); }
        
            // Deserializes properly
            private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException {
                s.defaultReadObject();
                setState(0); // reset to unlocked state
            }
        }
    
        // The sync object does all the hard work. We just forward to it.
        private final Sync sync = new Sync();
        
        public void lock()                { sync.acquire(1); }
        public boolean tryLock()          { return sync.tryAcquire(1); }
        public void unlock()              { sync.release(1); }
        public Condition newCondition()   { return sync.newCondition(); }
        public boolean isLocked()         { return sync.isHeldExclusively(); }
        public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
        public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(timeout));
        }
    }
    

    内容来源

    Java 并发编程实战

    http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-overview.html

    http://www.cnblogs.com/zhanjindong/p/java-concurrent-package-aqs-clh-and-spin-lock.html

    http://blog.csdn.net/wangyangzhizhou/article/details/40958637?utm_source=tuicool&utm_medium=referral

    http://blog.csdn.net/wangyangzhizhou/article/details/42177703

    http://blog.csdn.net/wangyangzhizhou/article/details/42065151

    http://blog.csdn.net/wangyangzhizhou/article/details/42197927

    相关文章

      网友评论

          本文标题:Java 多线程(四)- 理解 AbstractQueuedSy

          本文链接:https://www.haomeiwen.com/subject/loxqittx.html