AQS小结

作者: 末远川 | 来源:发表于2018-09-12 15:09 被阅读40次

    0X01概述

    提到java里的锁机制,通常有两种一种是jvm的synchronized关键字实现,一种是ReentranLock。前者是jvm实现,而后者则是java代码实现,AbstractQueuedSynchronizer则是java并发包里很多类的实现基础,比如ReentranLock,CountDownLatch,CyclicBarrier等。

    要实现锁的控制,试想一下,有几个地方

    • 怎么才叫持有锁?怎么才叫未拿到锁?锁是什么东西?
    • 如何实现抢占?未抢到锁的线程该用什么数据结构存储,以便其能在锁被释放后,按照什么逻辑获得锁?
    • 线程控制与强锁之间如何协调?才达到性能最好
    • 线程多个状态之间如何控制线程抢占关系
    • 释放锁的时候线程如何竞争?

    如果是面试的时候如何解释这个原理?
    先关注几个基础数据结构

    1. 用一个FIFO的CLH队列存储线程node
    2. Node对象是队列的节点对象,里面封装了thread、node statu等
    3. 用一个volatile的int state控制允许同时拿锁的数量

    在关几个关键过程

    1. 初始化队列
    2. 添加节点
    3. 节点状态变更

    一个线程通过子类实现的acquire方法,来实现对锁的获取,比如常见的ReentranLock实现的state值是1,就是同时只允许一个线程获取锁。

    对于未获取锁的线程开始竞争,这里举个栗子,加入此时有T2 T3两个线程没拿到锁,则他们开始入队,初始化队列,设置头节点,逐个添加尾节点。

    但这里问题又来了,如何保证多个线程对队列读写的安全性?要知道现在是没有锁的概念,因为正在实现锁的基础框架。所以java这里使用了乐观锁。提到乐观锁,就不得不提到cas操作,提到cas就要想到volatile保证线程可见性。

    另外一点,乐观锁一般是和“重试”操作息息相关的,cas只能保证一次操作的成功,如果一个线程操作成功了,那其他线程你不能不管不顾,要给他们一个去处,一个逻辑,比如重试逻辑,重试就代表你的代码需要重复执行,比如while(true) 比如for(;;),但凡死循环势必带一个终止条件。

    AQS里就是用cas、volatile和重试来保证队列操作的安全性。

    T2、T3现在(顺序看抢占顺序)添加到队列里,T3是队尾,T2是第二个节点,头节点是初始化节点是一个空的Node节点(不懂,就是这么设计的,只是作为一个控制节点存在),T2在队尾,如果后面有新来的线程就添加到队尾。

    如果仅仅是放到队列里肯定是不够的,在入队后,AQS会用一个for(;;)去操作节点,也就是俗称的线程自旋,自旋是说一个线程暂时不挂起,而是空转(就是空循环),每次循环尝试去获取锁,看看锁有没有被释放(调用子类acquire方法),如果没有就去更改Node状态,从初始化改为可以阻塞状态,下一次自旋后,根据状态让线程挂起,终止自旋。

    以上只是概述,下面来看看关键类源码和一些图。

    0X02关键代码和图例

    2.1Node代码

    static final class Node {
            static final Node SHARED = new Node();
            static final Node EXCLUSIVE = null;
            //表示节点的线程是已被取消的
            static final int CANCELLED = 1;
            //表示当前节点的后继节点的线程需要被唤醒
            static final int SIGNAL = -1;
            //表示线程正在等待某个条件
            static final int CONDITION = -2;
            //表示下一个共享模式的节点应该无条件的传播下去
            static final int PROPAGATE = -3;
            volatile int waitStatus;
            volatile Node prev;
            volatile Node next;
            volatile Thread thread;
            Node nextWaiter;
    }
    

    2.2AQS结构

    [图片上传失败...(image-38499c-1536736127795)]

    2.3Node节点状态图

    2.4AQS关键代码

    /**
     * 以独占所ReentrantLock为例,只有lock方法里线程CAS拿锁失败的那些线程会进入该方法
     */
    public final void acquire(int arg) {
        logger.info("线程{}开始抢占锁", Thread.currentThread().getId());
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { //如果tryAcquire尝试cas获取锁失败了,则入队
            //抢占失败 && 别的node 自旋抢到了锁
            selfInterrupt();
        }
    }
    

    tryAcquire的子类实现,Sync类ReentranLock里的类:

    /**
     * 尝试获取锁
     */
    final boolean nonfairTryAcquire(int acquires) {
        logger.info("nonfairTryAcquire {}尝试获取锁begin", Thread.currentThread().getId());
        final Thread current = Thread.currentThread();
        int c = getState();
        //            logger.info("nonfairTryAcquire {}当前 stat:{}", Thread.currentThread().getId(), c);
        if (c == 0) {//如果当前state0,说明上一个锁释放了,进入该方法的线程尝试cas获取锁
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                logger.info("nonfairTryAcquire  {}尝试获取锁success state = 0 ", Thread.currentThread().getId());
                return true;
            }
        } else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            logger.info("nonfairTryAcquire  nextc is {}", nextc);
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            logger.info("nonfairTryAcquire  {}尝试获取锁success state != 0 ", Thread.currentThread().getId());
            return true;
        }
        logger.info("nonfairTryAcquire {}尝试获取锁fail", Thread.currentThread().getId());
        return false;
    }
    

    添加到队尾或初始化队列

    /**
    * Creates and enqueues node for current thread and given mode.
    * 添加等待线程,设置独占还是共享
    * 独占模式,mode是一个null
    * mode是node里的nextWaiter
    *
    * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
    * @return the new node
    */
    private Node addWaiter(Node mode) {
      logger.info("addWaiter -{}", Thread.currentThread().getId());
      Node node = new Node(Thread.currentThread(), mode);
      // Try the fast path of enq; backup to full enq on failure
      // 队列不为空,队尾不为空的时候,把节点添加到队尾,
      Node pred = tail;
      if (pred != null) {
          node.prev = pred;
          if (compareAndSetTail(pred, node)) {
              pred.next = node;
              logger.info("addWaiter 队尾已经有元素,将新元素添加到队尾");
              try {
                  logger.info("最新的队尾元素{},旧的队尾元素{}", node.thread.getId(), node.prev.thread.getId());
    
              } catch (Exception e) {
              }
              return node;
          }
      }
      //否则,入队,初始化
      enq(node);
      return node;
    }
    

    初始化队列,生成头结点,添加到队尾

    private Node enq(final Node node) {
        logger.info("enq");
        int i = 0;
        for (; ; ) {
            Node t = tail;
            if (t == null) { //  //tail节点为空,初始化队列,将头结点设置为new Node(不关联任何线程)
                logger.info("tail节点为空,初始化队列,将头结点设置为new Node(不关联任何线程)");
                if (compareAndSetHead(new Node()))//CAS设置头结点,CAS的地方都可能有并发,多个thread过来,只有一个设置成head,其他的在下次循环,进入else逻辑
                    tail = head; //只有一个节点的时候,头尾都是一个
            } else {
                logger.info("线程{},自旋弟{}次添加到队尾", Thread.currentThread().getId(), ++i);
                node.prev = t;
                if (compareAndSetTail(t, node)) {//CAS添加到队尾,每次成功一个,只有成功设置了,才会返回t,cas失败继续循环,所以重试与CAS几乎是绑定的
                    t.next = node;
                    return t;//死循环终止条件
                }
            }
        }
    }
    

    自旋等待、状态变更、挂起线程

    /**
     * 把node插入队列末尾后,它并不立即挂起该节点中线程,
     * 因为在插入它的过程中,前面的线程可能已经执行完成,
     * 所以它会先进行自旋操作acquireQueued(node, arg),
     * 尝试让该线程重新获取锁!当条件满足获取到了锁则可以从自旋过程中退出,
     *
     * @param node the node
     * @param arg  the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        logger.info("acquireQueued start... {}", Thread.currentThread().getId());
        boolean failed = true;
        try {
            boolean interrupted = false;
            int i = 0;
            for (; ; ) {
                logger.info("acquireQueue-{}-自旋{}次", Thread.currentThread().getId(), ++i);
                final Node p = node.predecessor();//拿到前一个节点
                if (p == head && tryAcquire(arg)) {//如果前一个是头节点,就尝试一次tryAcquire获取锁
                    setHead(node);//获取成功就将该节点设为头节点,代表成功获得锁
                    logger.info("acquireQueued setHead {}", Thread.currentThread().getId());
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                //如果没获取到锁,则判断是否应该挂起
                //这个判断在shouldParkAfterFailedAcquire方法,
                //通过它的前驱节点的waitStatus来确定
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    状态变更:

    /**
     * 当一个node尝试获取锁失败的时候,检查更新node status字段。
     * 返回true表示需要阻塞(调用LockSupport)
     * //该方法在上一层死循环里被调用
     * > 0,将前驱节点踢出队列,返回false
     * < 0,也是返回false,不过先将前驱节点waitStatus设置为SIGNAL,使得下次判断时,将当前节点挂起.
     *
     * @param pred 前一个节点
     * @param node 当前节点
     * @return {@code true} if thread should block
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        logger.info("shouldParkAfterFailedAcquire-{}begin..", Thread.currentThread().getId());
        int ws = pred.waitStatus;
        logger.info("shouldParkAfterFailedAcquire-{} wait status:{}", Thread.currentThread().getId(), ws);
        if (ws == Node.SIGNAL)
            //SIGNAL,则返回true表示应该挂起当前线程,挂起该线程,并等待被唤醒,
            //被唤醒后进行中断检测,如果发现当前线程被中断,那么抛出InterruptedException并退出循环.
            return true;
        if (ws > 0) { //线程被中断或超时被取消,处于该CANCELLED状态的节点,应该从队列里删除
            //>0,将前驱节点踢出队列,返回false
            do { // 替换操作
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // <=0,非SINNAL,非CANCELLED也是返回false,
            // 不过先将前驱节点(acquireQueue方法里传过来的已经是前驱节点)waitStatus CAS置为SIGNAL,使得下次判断时,将当前节点挂起.
            logger.info("shouldParkAfterFailedAcquire-{} 将前驱节点waitStatus CAS置为SIGNAL,使得下次判断时,将当前节点挂起", Thread.currentThread().getId());
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    最后贴一个唤醒操作

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        logger.info("unparkSuccessor-{} wait status :{}", Thread.currentThread().getId(), ws);
        if (ws < 0)//小于0的时候,说明线程需要被唤醒
            compareAndSetWaitStatus(node, ws, 0);
    
        Node s = node.next;
        debug("unparkSuccessor 身后的节点:", s);
        logger.info("unparkSuccessor 此时队列队尾node是:{}", tail.thread.getId());
    
        if (s == null || s.waitStatus > 0) {
            //node的后继节点==null,从末尾开始向前寻找合适的节点,如果找到,则唤醒
            //是为何是从tail尾节点开始,而不是从node.next开始呢?
            // 原因在于node.next仍然可能会存在null或者取消了,
            // 所以采用tail回溯办法找第一个可用的线程。
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)//直到找到初始状态或等待唤醒状态的node
                    s = t;
        }
        //唤醒这个节点上的线程,该节点有可能是后继节点,有可能是从队列末尾找到的节点
        if (s != null) {
            debug("unparkSuccessor", s);
            logger.info("unparkSuccessor-{}唤醒下一个节点{}", Thread.currentThread().getId(), s.thread.getId());
            LockSupport.unpark(s.thread);
        }
    }
    

    2.5下图是未拿到锁的线程的调用流程
    流程

    相关文章

      网友评论

          本文标题:AQS小结

          本文链接:https://www.haomeiwen.com/subject/hnwvgftx.html