美文网首页服务端开发实战Java学习笔记程序员
并发七:AQS队列同步器实现分析

并发七:AQS队列同步器实现分析

作者: wangjie2016 | 来源:发表于2017-06-19 13:54 被阅读175次

    AQS

    队列同步器(AbstractQueuedSynchronizer)简称AQS,是J.U.C同步构件的基础,包括ReentrantLock、ReentrantReadWriteLock、CountDownLatch、Semaphor都是基于AQS实现的。
    理解AQS是理解这些同步工具的基础,基于AQS提供的同步语义可以定制各种功能的同步工具。

    AQS原理

    用一个int类型的状态变量(volatile)state记录同步状态,默认值是0
    用一个双向链表实现的队列对线程进程进行排队和调度

    A线程使用compareAndSet(state,0,1)原子设置state的
    值,设置成功说明state当前无其他线程争用,A线程取锁的使用权。

    设置不成功,说明B线程对state的值进行了设置,并且没有复位(state!=0),B线程持有锁的使用权(B线程还没有释放锁)。A线程会构造成一个Node节点加入队列尾部并挂起。

    当B线程执行完同步操作后,对state进行复位(state==0),即释放锁,然后从队列头开始寻找,发现正在沉睡的A线程,将其唤醒。

    Node节点

    static final class Node {
        /**共享模式 */
        static final Node SHARED = new Node();
        /**独占模式 */
        static final Node EXCLUSIVE = null;
        /**取消状态,由于在同步队列中等待的线程等待超时或被中断,
         * 需要从同步队列中取消等待。 
         */
        static final int CANCELLED = 1;
        /**通知状态,当前节点的后继节点包含的线程需要运行(unpark)
         * 当前节点的线程如果释放了同步状态或者被取消,将通知后续节点。
         */
        static final int SIGNAL = -1;
        /**条件阻塞状态,节点线程等待在Condition上,
         * 当其他线程对Condition调用了signal()方法后,
         * 该节点将会从等待队列中转移到同步队列中,
         * 加入到对同步状态的获取中。 
         */
        static final int CONDITION = -2;
        /**传播状态,表示当前场景下后续的acquireShared能够得以执行。*/
        static final int PROPAGATE = -3;
        /**节点的的状态 
         * 初始状态为0  表示当前节点在sync队列中,等待着获取状态。
         */
        volatile int waitStatus;
        /** 前驱节点 */
        volatile Node prev;
        /** 后继节点 */
        volatile Node next;
        /**节点对应的线程,等待获取同步状态的线程。 */
        volatile Thread thread;
        /**下一等待节点*/
        Node nextWaiter;
        /**是否共享模式 */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        /**获取前驱节点 */
        final Node predecessor() throws NullPointerException {}
        Node() {}
        Node(Thread thread, Node mode) {}
        Node(Thread thread, int waitStatus) {}
    }
    

    EXCLUSIVE、SHARED是节点的两种模式:独占模式和共享模式,分别对应独占锁和共享锁这两种典型的锁。
    thread就是节点对应的线程。
    waitStatus指节点状态:
    1.取消状态CANCELLED
    2.通知状态SIGNAL
    3.条件阻塞状态CONDITION
    4.传播状态PROPAGATE

    独占模式

    独占锁也称排它锁,一次只允许一个线程获取到锁,锁未释放前其他线程无法获取到锁。
    Synchronized关键字获取的内置锁就是一个独占锁,每次只能一个线程获得对象的监视器进入临界区。

    一个独占锁的例子,然后来分析AQS是如何实现线程同步的。

    /**
     * Lock是J.U.C中的接口接口,定义了一组体现完整锁语义的方法。 
     */
    public class ExclusiveLock implements Lock {
    
        private static class Sync extends AbstractQueuedSynchronizer {
                // 用CAS操作设置同步状态State,当前线程把state改为1
                // 其他线程便修改不了,可以看成当前线程持有了锁。
            protected boolean tryAcquire(int acquires) {
                if (this.compareAndSetState(0, acquires)) {
                    // 将持有线程设置为当前线程
                    this.setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
            // 释放同步状态
            protected boolean tryRelease(int releases) {// 释放同步状态
                if (Thread.currentThread() != this.getExclusiveOwnerThread()) {
                    throw new IllegalMonitorStateException();
                }
                if (this.getState() == 0) {
                    throw new IllegalMonitorStateException();
                }
                this.setExclusiveOwnerThread(null);
                this.setState(0);
                return true;
            }
            // 当前线程是否持有线程
            protected final boolean isHeldExclusively() {
                return getExclusiveOwnerThread() == Thread.currentThread();
            }
            // 实例化Condition对象
            final ConditionObject newCondition() {
                return new ConditionObject();
            }
        }
        //  队列同步器实例
        private final Sync sync = new Sync();
        // 加锁,线程请求到锁则返回,请求不到锁则阻塞
        public void lock() {
            sync.acquire(1);
        }
        // 非阻塞加锁,线程请求到锁返回true,请求不到锁返回false,不会阻塞。
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
        // 解锁,
        public void unlock() {
            sync.release(1);
        } 
        // 加锁 响应中断,在获取锁的过程中线程被中断抛出InterruptedException异常,停止锁获取
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
        // 非阻塞加锁 时间限制
        public boolean tryLock(long time, TimeUnit unit)  throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(time));
        }
        // 实例化 Condition
        public Condition newCondition() {
            return sync.newCondition();
        }
    }
    

    独占模式加锁流程

    // s1
    public final void acquire(int arg) {
       if (!tryAcquire(arg) && 
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
           selfInterrupt();
    }
    

    s1: 调用tryAcquire(1)加锁失败,构造一个独占模式(EXCLUSIVE)的Node准备入列,转入s2。

       // s2
       private Node addWaiter(Node mode) {
           // 用当前线程和mode,实例化Node
           Node node = new Node(Thread.currentThread(), mode);
           Node pred = tail;
           if (pred != null) {// 队列不为空
               node.prev = pred;
               if (compareAndSetTail(pred, node)) {// 快速入列,设置node为新的尾节点
                   pred.next = node;
                   return node;
               }
           }
           enq(node);//队列为空||CAS尾节点设置失败
           return node;
       }
    

    s2: 如果队列已经初始化,使用compareAndSetTail以CAS的方式将Node设置为尾节点,转入s4;如果队列为空或者设置尾节点失败,转入s3

       // s3
       private Node enq(final Node node) {
           for (;;) { // 循环确保入列成功
               Node t = tail;// 尾节点
               if (t == null) { // 未初始化,将头、尾节点都指向空白节点
                   if (compareAndSetHead(new Node()))
                       tail = head;
               } else { // node 入列
                   node.prev = t;// node的前驱指向尾节点
                   if (compareAndSetTail(t, node)) {// 设置尾节点为当前节点
                       t.next = node;
                       return t;
                   }
               }
           }
       }
    

    s3: 如果队列未初始化,实例化一个空队列tail和head都指向一个空白节点,使用compareAndSetTail以CAS的方式将Node设置为尾节点,在循环中确保设置成功,转入s4

    // s4
    final boolean acquireQueued(final Node node, int arg) {
           boolean failed = true;
           try {
               boolean interrupted = false;
               for (;;) {
                   final Node p = node.predecessor();//前驱节点
                   if (p == head && tryAcquire(arg)) {//前驱节点为头节点并成功获取锁
                       setHead(node);//设置节点为头
                       p.next = null; // help GC 摘除原来的头节点
                       failed = false;
                       return interrupted;
                   } // 前驱非头节点或者重试获取锁失败
                   // 线程在此处被挂起,当线程被唤醒后也会在这里重新进入for(;;)获取锁
                   if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                       interrupted = true;
               }
           } finally {
               if (failed)
                   cancelAcquire(node);
           }
       }
    

    s4: 如果Node节点在队列中拍第二,重试获取锁,获取成功后,将Node设置为头节点直接返回;否则进入s5

    s5返回了true说明当前节点可挂起了,调用parkAndCheckInterrupt()方法将线程挂起,线程挂起操作和CAS操作一样都是调用Unsafe中的native方法。
    如果此线程被中断了,他会被唤醒并且返回中断标识true,进入到下次循环,如果拿不到锁还是接着park,如果拿到锁返回到s1,会记录下中断状态selfInterrupt(),用户可以自行处理中断状态,对流程没有任何影响。

    for(;;)主要是为了保障两点:
    一是每个Node挂起前都能将前驱节点的状态设置为SIGNAL
    二是在每个Node被唤醒后再次进入锁的获取中

    循环中如果出现异常,将取消锁获取 cancelAcquire(node)。

      // s5
      private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus; // 前驱节点状态
            if (ws == Node.SIGNAL)//状态为SIGNAL,说明node可挂起,返回true
                return true;
            if (ws > 0) {//摘掉状态为CANCELLED的前驱节点
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;// 找到非CANCELLED状态的节点,将Node挂在其后面
            } else {//为-3、-2
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);// 设置前驱节点为SIGNAL状态
            }
            return false;
        }
    

    s5: 此处主要是将前驱节点的waitStatus设置为SIGNAL。

    如果前驱节点的状态为SIGNAL表示前驱节点为"通知状态",前驱节点释放锁后会后继等待的节点会被唤醒,所以Node可以放心的挂起,直接返回true至s4 进行线程挂起。

    如果前驱节点的状态为CANCELLED表示前驱节点放弃了锁获取,通过循环向前查找到,直到找到最近一个非CANCELLED状态的节点,将node挂在它的后边,CANCELLED节点会被从队列中摘除。

    CAS设置前驱节点的状态为SIGNAL,返回s4 false至s4,进入下次循环,下次循环发现前驱节点的状态已经为SIGNAL了,可以挂起了。

    独占模式解锁流程

    // s1
    public final boolean release(int arg) {
       if (tryRelease(arg)) {//解锁成功
           Node h = head;
           if (h != null && h.waitStatus != 0)//头节点不为空&&不在进行中
               unparkSuccessor(h);//唤醒后续节点
           return true;
       }
       return false;
    }
    

    s1:调用tryRelease(arg)进行解锁逻辑,解锁成功,准备唤醒头节点的后继节点。
    h.waitStatus ==0说明后继节点正在唤醒中。
    转入s2。

    // s2
    private void unparkSuccessor(Node node) {
           int ws = node.waitStatus;
           if (ws < 0)// head节点状态设置为0
               compareAndSetWaitStatus(node, ws, 0);
           Node s = node.next;
           if (s == null || s.waitStatus > 0) {// s为空或CANCELLED
               s = null;
               // 从尾部开始向前查找,找到一个非CANCELLED状态的节点
               for (Node t = tail; t != null && t != node; t = t.prev)
                   if (t.waitStatus <= 0)
                       s = t;// 赋给s
           }
           if (s != null)// 唤醒s
               LockSupport.unpark(s.thread);
       }
    

    s2:将head节点的状态设置为0,表示有节点正在被唤醒。
    如果后继节点为空或者CANCELLED,从尾节点开始向前查找,找到一个非CANCELLED状态的节点,将其赋值给s。
    唤醒s节点,调用Unsafe中的unpark方法。
    需要注意的是被唤醒的节点还要回到acquireQueued()方法里的挂起点,再次进行锁获取,如果还是没有获取到锁则接着被挂起。

    共享模式

    共享锁是可以有多个线程获取并持有的锁,获取到锁的线程都可以进入同步代码块执行。
    这有点像数据库连接池,操作数据库时先从池中借出一个连接,操作完毕,将连接归还入池,供后续操作继续借用。

    AQS共享锁实现:

    public class ShareLock implements Lock {
        // 同步器
        private static class Sync extends AbstractQueuedSynchronizer {
            // 初始化,available:共享许可数
            Sync(int available) {
                this.setState(available);
            }
            // 获取同步状态
            protected int tryAcquireShared(int acquires) {
                for (;;) {
                    //当前许可数
                    int available = this.getState();
                    //减去1后的剩余量
                    int remaining = available - acquires;
                    //剩余量<0,直接返回
                    if (remaining < 0 
                        //说明还有剩余量CAS设置available
                        || compareAndSetState(available, remaining)) {
                        return remaining;
                    }
                }
            }
            // 释放同步状态
            protected boolean tryReleaseShared(int releases) {
                for (;;) {
                    //当前许可数量
                    int current = this.getState();
                    //加1后的最新许可量
                    int available = current + releases;
                    if (available < current) // overflow
                        throw new Error("释放不合法");
                    if (compareAndSetState(current, available))
                        return true;
                }
            }
        }
        // 同步器实例
        private final Sync sync = new Sync(2);
        // 加锁
        public void lock() {
            sync.acquireShared(1);
        }
        // 非阻塞加锁
        public boolean tryLock() {
            return sync.tryAcquireShared(1) > 0;
        }
        // 解锁
        public void unlock() {
            sync.releaseShared(1);
        }
        // 加锁 响应中断
        public void lockInterruptibly() 
                                throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
        // 非阻塞加锁 时间限制
        public boolean tryLock(long time, TimeUnit unit) 
                                    throws InterruptedException {
            return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
        }
        // 实例化 Condition
        public Condition newCondition() {
            return null;
        }
    }
    

    共享模式实现逻辑

    1:初始化同步器Sync时,先设定许可共享数,即有多少把锁,许可共享数保存在共享状态state中。

    2:每次加锁许可共享数都会减1作为剩余量,当剩余量小于0时,说明没有可用的许可了,直接返回剩余量,AQS中的“acquireShared”发现剩余量小于0,开始构造Node进入排队逻辑。
    当还有剩余量(remaining>=0)说明线程还能获取共享锁,剩余量减1,直接返回,取锁成功。
    3:释放锁时将剩余量加1,CAS设置state为剩余量,设置成功则释放锁成功。
    4:独占锁释放时没有使用CAS操作,因为独占锁释放不存在线程争用,共享锁会出现多个线程释放锁的情况,state存在争用。:
    5:共享锁的newCondition()方法返回null,因为Condition只能使用在独占锁中。后面会专门分析条件队列Condition。
    6:理解了独占锁的加锁解锁流程,再看共享锁的加解锁流程,应该没有障碍,这里不再累述。
    7:需要注意共享锁在唤醒的节点后,如发现还有剩余量,还有节点在排队,将继续唤醒后继节。

    小结

    1. AQS是J.U.C最核心的同步构件,也是J.U.C中最难弄懂的类之一,是理解重入锁、读写锁及J.U.C中其他同步工具的基础。
    2. AQS提供了基础的同步语义,可以根据应用场景创建不同的同步组件。
    3. 基于AQS实现的锁,并非有些文档说的自旋锁,底层是使用Unsafe的park和unpark方法来挂起和唤醒线程。

    码字不易,转载请保留原文连接http://www.jianshu.com/p/d291a6a1879c

    相关文章

      网友评论

        本文标题:并发七:AQS队列同步器实现分析

        本文链接:https://www.haomeiwen.com/subject/scvgqxtx.html