美文网首页
AQS(AbstractQueuedSynchronizer)详

AQS(AbstractQueuedSynchronizer)详

作者: Duanty | 来源:发表于2021-04-25 17:12 被阅读0次

    什么是AQS?

    • AQS(AbstractQueuedSynchronizer): 是并发容器J.U.C(java.util.concurrent)下locks包内的一个类. 它实现了一个FIFO(FirstIn、FisrtOut先进先出)的队列. 底层实现的数据结构是一个双向链表.

    • AQS的核心思想是, 如果被请求的共享资源是空闲的, 则将当前请求资源的线程设置为有效线程, 并且将共享的资源设置为锁定的状态. 如果被请求的共享资源被占用, 那么就需要一套线程阻塞/等待以及唤醒进行锁分配的机制, 这个机制AQS是用CLH(参考:备注1)队列锁实现的, 就是将暂时获取不到锁的线程加入到队列中进行等待.

    • AQS定义两种资源共享方式: Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)

    • AQS同步器是用一个int变量state来表示状态. 同步功能使用的方法都是子类继承AbstractQueuedSynchronizer类实现的. 子类通过继承同步器实现自身需要的方法来管理state状态, 管理的方式就是通过accquire()/accquireShared()/release()/releaseShared()等方法来操作状态. 在多线程环境下状态的操作必须保证其原子性, 所以子类在状态的管理中需要使用AQS同步器提供的三个方法操作state: getState()/setState(int)/compareAndSetState(int, int).

    • 子类推荐被定义为自定义同步装置的内部类(大佬都是这么实现的, 跟着没毛病).

    备注1:

    CLH锁即Craig, Landin, and Hagersten (CLH) locks. CLH锁是一个自旋锁。能确保无饥饿性. 提供先来先服务的公平性.
    CLH锁也是一种基于链表的可扩展、高性能、公平的自旋锁, 申请线程仅仅在本地变量上自旋, 它不断轮询前驱的状态, 假设发现前驱释放了锁就结束自旋

    AQS中的数据结构

    节点和同步队列

    Node节点属性

    • 节点的状态waitStatus.
      CANCELLED(1): 表示当前节点被取消, 进入改状态的节点将不会在发生变化.
      SIGNAL(-1): 表示后继结点在等待当前结点唤醒. 后继结点入队时, 会将前继结点的状态更新为SIGNAL.
      CONDITION:(-2) : 表示当前节点在condition队列中进行等待. 当其他线程调用了Condition的signal()方法后, CONDITION状态的结点将从等待队列转移到同步队列中, 等待获取同步锁.
      PROPAGATE(-3): 共享模式下, 前继结点不仅会唤醒其后继结点, 同时也可能会唤醒后继的后继结点.
      值为0, 表示当前节点在sync队列中,等待着获取同步锁.
    • Node prev: 前驱节点.
    • Node next: 后继节点.
    • Node nextWaiter: 存储condition队列中的后继节点.
    • Thread thread: 当前线程.

    同步队列数据结构

    核心方法分析

    public final void acquire(int arg)

    该方法是独占模式下线程获取共享资源的入口, 如果获取到资源后, 线程直接返回.否则将进入等待队列, 直到获取到资源为止(整个过程忽略中断的影响). 这就是Lock.lock()的语义, 你也可以自定义Lock顶层接口, 参考 Doug Lea对Lock的定义.

      public final void acquire(int var1) {
            if (!tryAcquire(var1) && acquireQueued(addWaiter(Node.EXCLUSIVE), var1)) {
                selfInterrupt();
            }
        }
    

    函数流程如下:

    1. tryAcquire(): 尝试直接获取资源, 如果成功直接返回(调用tryAcquire更改状态,需要保证原子性. 这里体现了非公平锁, 每个线程获取锁时会尝试直接抢占加塞一次, 而CLH队列中可能还有别的线程在等待).
    2. addWaiter(): 如果获取不到, 将当前线程构造成节点Node并加入sync队列的尾部, 并且标记为独占模式.
    3. acquireQueued(): 使线程阻塞在等待队列中获取资源, 一直获取到资源后才返回. 如果在整个等待过程中被中断过, 则返回true, 否则返回false.
    4. 如果线程在等待过程中被中断过, 它是不响应的. 只是获取资源后才再进行自我中断selfInterrupt(), 将中断补上(响应前面说的, 整个等待过程忽略中断的影响).
    1. tryAcquire()方法

    此方法尝试去获取独占资源. 如果获取成功, 则直接返回true, 否则直接返回false. 这也正是tryLock()的语义, 还是那句话. 当然不仅仅只限于tryLock().
    如下是tryAcquire()的源码

        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    

    这里throw异常是留给我们进行实现的. AQS只是一个框架, 具体资源的获取和释放逻辑由我们自定义同步器去实现(就像ReentrantLock类). 需要自定义实现的方法都没有定义成abstract, 由我们根据同步器独占/共享自有选择.

    2. addWaiter(Node)方法
        private Node addWaiter(Node mode) {
            // 以给定模式构造结点. mode有两种: EXCLUSIVE(独占)和SHARED(共享)
            Node node = new Node(Thread.currentThread(), mode);
            //尝试直接将节点放到sync队列尾部,
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //如果放入尾部失败, 调用enq()入队
            enq(node);
            return node;
        }
    
    3. enq(Node)方法
        private Node enq(final Node node) {
            //CAS"自旋", 直到成功加入队尾
            for (;;) {
                Node t = tail;
                if (t == null) { //  队列为空, 创建一个空的结点作为head结点, 并将tail也指向它, 这是一个初始化的动作
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {//正常流程, 放入队尾
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    CAS自旋volatile变量, 保证了可见性, 操作上又是原子方法. 这是一种很经典的用法

    4. acquireQueued(Node, int)方法

    当节点进入同步队列后, 接下来就是要等待获取锁(访问控制), 同一时刻只有一个线程在运行, 其他都要进入等待状态. 每个线程节点都是独立的, 他们进行自旋判断, 当发现前驱节点是头结点并且获取了状态(tryAcquire()自己实现原子性操作), 那这个线程就可以运行了.

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;//标记是否可以成功拿到状态
            try {
                boolean interrupted = false;//处理过程中是否被中断过
                for (;;) {//自旋
                    final Node p = node.predecessor();//获取当前节点的前驱节点
                  //如果前驱节点是head, 当前节点就是排第二. 这个时候可以尝试去获取资源了(头结点可能释放完唤醒自己了)
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);//设置头节点为当前节点
                        p.next = null; // help GC setHead()中node.prev已置为null, 此处再将head.next置为null. 方便gc回收head节点.
                        failed = false;//标记成功获取资源
                        return interrupted;
                    }
                    //不满足唤醒条件, 调用park()进入waiting状态, 等待unpark(). 如果等待的过程被中断, 线程会从park()中醒过来, 发现拿不到资源后继续进入park()中等待.
                    if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                        interrupted = true;//如果线程被终端, 标记interrupted为true, 等待线程获取到资源后在中断
                }
            } finally {
                if (failed)//如果等待过程中没有成功获取资源(不可控异常), 取消线程在队列的等待
                    cancelAcquire(node);
            }
        }
    shouldParkAfterFailedAcquire()方法如果发现前驱节点状态不是SIGNAL, 会标记前驱节点状态为SIGNAL(-1). 如果发现前驱节点放弃等待了就一直往前找节点, 直到找到正常等待的节点排队到它后面.
    parkAndCheckInterrupt()使线程进入waiting状态, 如果发现被唤醒, 检查是不是被中断了并且清除状态.
    

    acquire()方法总结

    1. 尝试直接插队获取资源, 如果不成功进入同步队列排队.
    2. 调用park()进入waiting状态, 等待前驱节点调用unpark()或者interrupt()唤醒自己. interrupt()唤醒拿不到资源继续进入waiting状态.
    3. 被唤醒后尝试获取资源, 如果获取不到资源进入2流程, 获取到资源就执行后续代码(如果等待过程被中断过此时会调用selfInterrupt()将中断补上).

    public final boolean release(int arg)

    该方法是独占模式下线程释放共享资源的入口.

        public final boolean release(int arg) {
            if (tryRelease(arg)) {//释放资源, 自定义函数实现
                Node h = head;
                if (h != null && h.waitStatus != 0)//拿到头结点
                    unparkSuccessor(h);//唤醒等待队列中的下一个线程
                return true;
            }
            return false;
        }
    
    1. tryRelease(arg)方法

    需要我们实现的独占资源释放函数.

    protected boolean tryRelease(int arg) {
         throw new UnsupportedOperationException();
    }
    
    2. unparkSuccessor(node) 方法

    唤醒等待队列中的下一个线程

        private void unparkSuccessor(Node node) {
            //当前线程节点的状态
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);//设置当前线程的节点状态为0, 因为已经释放资源
            Node s = node.next; //找到下一个需要唤醒的节点
            if (s == null || s.waitStatus > 0) {//下一个节点为空或者已经放弃等待就取消唤醒操作
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)//从后往前找有效的节点
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);//唤醒有效节点
        }
    下一个有效的线程被唤醒后处在acquireQueued()的自旋流程中, 然后进入资源判断获取(if (p == head && tryAcquire(arg))).
    

    public final void acquireShared(int arg)

    此方法是共享模式下线程获取共享资源的顶层入口. 它会获取指定量的资源(state), 获取成功后直接返回, 获取失败进入等待队列, 直到获取到资源(整个过程忽略中断的影响).参考ReentrantReadWriteLock设计.

     public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)//改方法需要自定义同步器实现. 返回语义负数表示失败, 0或者大于零表示获取成功.
                doAcquireShared(arg);//小于零进入等待队列, 获取资源后返回
        }
    
    1. doAcquireShared(arg)方法
        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);//加入队列的尾部, 模式为共享. addWaiter()方法参考上面介绍
            boolean failed = true;//成功失败标识
            try {
                boolean interrupted = false;//是否中断标识
                for (;;) {//CAS自旋
                    final Node p = node.predecessor();//获取前驱节点
                    if (p == head) {//前驱节点为头结点, 尝试获取资源(此处有可能是前驱节点唤醒了自己)
                        int r = tryAcquireShared(arg);//获取资源
                        if (r >= 0) {//成功
                            setHeadAndPropagate(node, r);//将head指向自己, 此时r>0, 还有剩余资源唤醒后续排队线程
                            p.next = null; // help GC
                            if (interrupted)// 中断标识
                                selfInterrupt();//补上中断
                            failed = false;
                            return;
                        }
                    }
                     //不满足唤醒条件, 调用park()进入waiting状态, 等待unpark(). 如果等待的过程被中断, 线程会从park()中醒过来, 发现拿不到资源后继续进入park()中等待.
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    2. setHeadAndPropagate(Node, int)方法
        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; 
            setHead(node);/head指向自己
            //如果还有剩余量, 继续唤醒下一个排队的线程
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    

    acquireShared()方法总结

    1. tryAcquireShared()方法尝试获取资源, 成功直接返回, 如果不成功进入同步队列排队.
    2. 调用park()进入waiting状态, 等待前驱节点调用unpark()或者interrupt()唤醒自己.
    3. 被唤醒后尝试获取资源, 如果获取不到资源进入2流程, 获取到资源就执行后续代码.
      其实同acquir()方法一样, 只不过该方法在自己拿到资源后回去唤醒后继线程

    public final boolean releaseShared(int arg)

    该方法是共享模式下线程释放共享资源的入口. 跟独占模式下的资源释放方法release()很相似, 不同的是独占模式一般是完全释放资源(state=0)后才允许去唤醒其他线程, 而共享模式往往不会这么控制, 具体实现要看自定义同步器的逻辑.

        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {//尝试释放资源, 该方法需要自定义共享同步器实现.
                doReleaseShared();//唤醒后继节点
                return true;
            }
            return false;
        }
    
    1. tryReleaseShared()方法

    需要我们自己实现的共享资源释放方法.

        protected boolean tryReleaseShared(int arg) {
            throw new UnsupportedOperationException();
        }
    
    2. doReleaseShared()方法

    该方法是用来唤醒后继节点的.

        private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;           
                        unparkSuccessor(h);//唤醒后继节点
                    } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;               
                }
                if (h == head)  //head节点如果发生变化即退出自旋
                    break;
            }
        }
    

    releaseShared()方法总结

    1. tryReleaseShared()方法进行共享资源的释放.
    2. doReleaseShared()方法用来唤醒后继节点.
    以上是几个AQS常用的资源获取和释放的基本方法, 其实还有一些方法和上面分析的方法略有不同, 如下:
    • 独占式获取资源
      1. acquireInterruptibly(int arg): 类似于acquire()方法, 不同的地方是该方法响应外界对线程的中断信号, 并且抛出InterruptedException()异常.
      2. tryAcquireNanos(int arg, long nanosTimeout) : 类似于acquire()方法, 同样响应中断抛出InterruptedException()异常, 并且该方法有获取超时时间.
    • 共享式获取资源
      1. acquireSharedInterruptibly(int arg): 类似acquireInterruptibly()方法的共享实现, 同样响应中断抛出InterruptedException()异常.
      2. tryAcquireSharedNanos(int arg, long nanosTimeout)(): 类似tryAcquireNanos()方法的共享实现, 同样响应中断抛出InterruptedException()异常, 并且该方法有获取超时时间.

    测试案例

    1. ExclusiveLock(自定义独占锁)

    ExclusiveLock是互斥的不可重入锁实现, 对锁资源State的操作只有0和1两个状态, 0代表未锁定,1代表锁定. 按照上面的分析, 我们需要实现AQS的tryAcquire()和tryRelease()方法.

    public class ExclusiveLock implements Lock {
        //自定义内部类同步器
        private static class ExclusiveSync extends AbstractQueuedSynchronizer {
            //判断是否是锁定状态
            @Override
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
            //尝试获取资源, 如果成功直接返回. 获取成功返回true, 否则返回false.
            @Override
            protected boolean tryAcquire(int arg) {
                if(compareAndSetState(0, 1)){//状态变更必须为CAS原子操作, 保证原子性
                    setExclusiveOwnerThread(Thread.currentThread());//同样也是原子操作
                    return true;
                }
                return false;
            }
            //尝试释放资源
            @Override
            protected boolean tryRelease(int arg) {
                if(getState() == 0){
                    throw new UnsupportedOperationException();
                }
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
            
        }
        //创建自定义同步器的实现
        private final ExclusiveSync sync = new ExclusiveSync();
        //获取资源, 同acquire()语义一样, 获取不到进入同步队列等待成功返回
        @Override
        public void lock() {
            sync.acquire(1);
        }
       //判断锁是否被占有
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
        //获取资源, 立刻返回结果
        @Override
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
        //
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(time));
        }
        //释放资源
        @Override
        public void unlock() {
            sync.release(1);
        }
        
    }
    

    1. ShareLock(自定义共享锁)

    ShareLock为一个共享同步器的实现, 设计同一时刻可以有两个线程获取到资源, 超过两个进行同步队列阻塞. 按照上面的分析, 我们实现AQS的tryAcquireShared()和tryReleaseShared()方法.

    public class ShareLock implements Lock {
        
        
        public static class ShareSync extends AbstractQueuedSynchronizer{
            
            //定义同步器的初始状态为2
            ShareSync(int count) {
                if (count <= 0) {
                    throw new IllegalArgumentException("count must large than zero.");
                }
                setState(count);
            }
    
            @Override
            protected int tryAcquireShared(int reduceCount) {
                for (;;) {
                    int current = getState();
                    int newCount = current - reduceCount;
                    if (newCount < 0 || compareAndSetState(current, newCount)) {
                        return newCount;
                    }
                }
    
            }
    
            @Override
            protected boolean tryReleaseShared(int reduceCount) {
                for (;;) {
                    int current = getState();
                    int newCount = current + reduceCount;
                    if (compareAndSetState(current, newCount)) {
                        return true;
                    }
                }
            }
        }
        
        
        private final ShareSync sync = new ShareSync(2);
        
        
        @Override
        public void lock() {
            sync.acquireShared(1);
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
    
        @Override
        public boolean tryLock() {
            return sync.tryAcquireShared(1) >= 0;
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
        }
    
        @Override
        public void unlock() {
            sync.releaseShared(1);
        }
    
        @Override
        public Condition newCondition() {
            return null;
        }
    }
    

    相关文章

      网友评论

          本文标题:AQS(AbstractQueuedSynchronizer)详

          本文链接:https://www.haomeiwen.com/subject/fkjnlltx.html