AQS学习

作者: 摸摸脸上的胡渣 | 来源:发表于2020-02-27 15:18 被阅读0次

    1.独占式同步状态获取

    AQS提供了很多模板方法,模板方法中已经定义好了各种行为,只需要实现其中几个关键的行为(接口),就可以复用整体的逻辑,有较好的框架和复用性。

    1.1 获取同步执行权-acquire

    AQS底层是一个双向队列,也称CLH队列(其实就是仨人名)。当获取执行权时,有两种可能,获取到了(皆大欢喜),没获取到(队尾排队去)。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    首先尝试获取(==tryAcquire==),如果获取不到则尝试入队(==acquireQueued==,其中会自旋继续尝试获取执行权)。

    1.1.1 尝试获取同步执行权-tryAcquire

    其中的==tryAcquire==需要继承方线程安全的实现获取方法。

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    

    1.1.2 尝试入队并自旋尝试获取执行权-acquireQueued

    尝试入队是通过==addWaiter==方法进行的。完了以后就是在==acquireQueued==方法中,尝试获取执行权。
    问:AQS中什么时候能获取到执行权,当封装了==Thread==的节点信息排到队首的时候。
    所以==acquireQueued==方法中就会自旋的检查当前节点到没到队首啊,没到的话,继续block。

     1: final boolean acquireQueued(final Node node, int arg) {
     2:     // 记录是否获取同步状态成功
     3:     boolean failed = true;
     4:     try {
     5:         // 记录过程中,是否发生线程中断
     6:         boolean interrupted = false;
     7:         /*
     8:          * 自旋过程,其实就是一个死循环而已
     9:          */
    10:         for (;;) {
    11:             // 当前线程的前驱节点
    12:             final Node p = node.predecessor();
    13:             // 当前线程的前驱节点是头结点,且同步状态成功
    14:             if (p == head && tryAcquire(arg)) {
    15:                 setHead(node);
    16:                 p.next = null; // help GC
    17:                 failed = false;
    18:                 return interrupted;
    19:             }
    20:             // 获取失败,线程等待--具体后面介绍
    21:             if (shouldParkAfterFailedAcquire(p, node) &&
    22:                     parkAndCheckInterrupt())
    23:                 interrupted = true;
    24:         }
    25:     } finally {
    26:         // 获取同步状态发生异常,取消获取。
    27:         if (failed)
    28:             cancelAcquire(node);
    29:     }
    30: }
    

    1.1.3 如何判断是否要进入block状态?- shouldParkAfterFailedAcquire

    入参是当前节点的前节点和当前节点
    4~9 判断前节点是不是已经处于等待状态(==Node.SIGNAL==),如果前节点已经处于等待状态了,那就说明当前节点更应该处于等待状态,毕竟CLH队列是一个FIFO的队列,判断完成,应该block,返回。
    10~18 如果前节点已经被取消(==Node.CANCEL==),已经取消的节点不应该成为当前节点是否应该入队的考虑条件,所以一直向前探,探到第一个状态为非取消的节点,然后返回不应该block,下一次是否应该block由下一次进入==shouldParkAfterFailedAcquire==再决定。【todo,为什么不在本次调用就判断?毕竟可以做到知道前节点的状态。】

     1: private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
     2:     // 获得前一个节点的等待状态
     3:     int ws = pred.waitStatus;
     4:     if (ws == Node.SIGNAL) //  Node.SIGNAL
     5:         /*
     6:          * This node has already set status asking a release
     7:          * to signal it, so it can safely park.
     8:          */
     9:         return true;
    10:     if (ws > 0) { // Node.CANCEL
    11:         /*
    12:          * Predecessor was cancelled. Skip over predecessors and
    13:          * indicate retry.
    14:          */
    15:         do {
    16:             node.prev = pred = pred.prev;
    17:         } while (pred.waitStatus > 0);
    18:         pred.next = node;
    19:     } else { // 0 或者 Node.PROPAGATE
    20:         /*
    21:          * waitStatus must be 0 or PROPAGATE.  Indicate that we
    22:          * need a signal, but don't park yet.  Caller will need to
    23:          * retry to make sure it cannot acquire before parking.
    24:          */
    25:         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    26:     }
    27:     return false;
    28: }
    

    1.1.4 如何进入block状态? - parkAndCheckInterrupt

    什么时候会调用这个方法?首先调用==acquire==方法,尝试获取执行权,如果失败,会尝试入队==acquireQueued==,在入队中,也在尝试获取执行权,如果获取失败,会先调用==shouldParkAfterFailedAcquire==判断当前节点对应的线程是否应该block,如果应该,则调用==parkAndCheckInterrupt==对线程进行block处理。
    看下源码实际执行逻辑
    首先调用==LockSupport.park==将当前线程block。【todo LockSupport.park的原理】
    然后当前线程是否被被打断过,==Thread.interrupted==。【todo interrupted标志位有什么用?】
    正常情况下,会有两种被打断的情况

    1. 前节点释放执行权利,唤醒当前节点。
    2. 当前线程被打断导致唤醒。【todo 不甚理解这句话,和前一个todo一起理解】

    1.1.5 遇到异常情况,如何取消入队操作 - cancelAcquire

    入队发生异常时,调用cancelAcquire(Node node),node为想要入队的节点。目的是取消当前节点的入队操作,并且当前节点从同步队列中删除。
    当前节点分三种状态,队首,队尾和队中。位于队尾和队中的节点,将自己从队中删除即可。但是涉及到队首的节点,队首的节点有什么特殊意义呢?排到队首的节点自动获取当前同步状态的执行权利,所以不能简单的还将自己删除,还需要将执行权利向下传递。这也就是24~26(队首),29~36(队中)和38(队首)代码的含义。具体的传递执行权利的逻辑还需要看 1.1.6 unparkSuccessor
    3行 常规的判空操作
    6行 将要删除的节点对应的线程置空 【todo,为什么要置空,是不是其他调用处用到了==thread == null==去做已删除节点的判断】
    9~11行 跳过已经处于取消状态的节点,前探到第一个非取消状态的节点。
    16行 获取当前节点的前节点。看下注释,==predNext==引用指向的节点是明显的第一个不用再跳过的节点,如果不是的化,下面的CAS操作将会失败。失败的原因是另一个线程比我们执行的快,它可能提前进行了取消或者通知操作。所以在这步,不需要额外的操作。
    21行 将当前节点状态置成取消状态。下面的删除操作有可能执行不到,所以需要将当前节点状态置成取消状态,这样的话,其他节点操作时,可以凭借此状态,判断当前节点不需要再被执行,继而跳过当前节点。
    24~26 如果当前节点是尾节点,则使用CAS,将前节点的==next域==置null。这里就用到了第16行的逻辑,使用CAS保证的是当其他线程已经更改了当前节点的前节点的话,这里将会失败,然后退出==cancelAcquire方法==。继而使用第21行的逻辑,其他节点执行操作时,会凭借==Node.CANCELLED==状态,跳过当前节点。
    27~36 删除队中节点。【todo,看下30~33行的逻辑,为什么当前节点要被取消了,就要改变前节点的ws?只要<=0就要改成==Node.Signal==,我推测应该是ws之间有正确的流转状态,这个很重要,要探索出来。】
    36行 然后给前节点和后节点做桥,将二者关联起来。相当于将自己从队列中删除掉了。
    33行处于29~36(队中),但是是为了防止误删队首节点,导致执行权利无法向下传递的保证。因为可能开始执行29行时,当前节点还是队中,但随着执行,当前节点可能就被消费到了队首的位置,只有队首节点的==Node.Thread==才有可能为null。
    38行 队首节点对应的分支,唤醒继任者,具体的逻辑看1.1.6 unparkSuccessor

    看下源码

     1: private void cancelAcquire(Node node) {
     2:     // Ignore if node doesn't exist
     3:     if (node == null)
     4:         return;
     5: 
     6:     node.thread = null;
     7: 
     8:     // Skip cancelled predecessors
     9:     Node pred = node.prev;
    10:     while (pred.waitStatus > 0)
    11:         node.prev = pred = pred.prev;
    12: 
    13:     // predNext is the apparent node to unsplice. CASes below will
    14:     // fail if not, in which case, we lost race vs another cancel
    15:     // or signal, so no further action is necessary.
    16:     Node predNext = pred.next;
    17: 
    18:     // Can use unconditional write instead of CAS here.
    19:     // After this atomic step, other Nodes can skip past us.
    20:     // Before, we are free of interference from other threads.
    21:     node.waitStatus = Node.CANCELLED;
    22: 
    23:     // If we are the tail, remove ourselves.
    24:     if (node == tail && compareAndSetTail(node, pred)) {
    25:         compareAndSetNext(pred, predNext, null);
    26:     } else {
    27:         // If successor needs signal, try to set pred's next-link
    28:         // so it will get one. Otherwise wake it up to propagate.
    29:         int ws;
    30:         if (pred != head &&
    31:             ((ws = pred.waitStatus) == Node.SIGNAL ||
    32:              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
    33:             pred.thread != null) {
    34:             Node next = node.next;
    35:             if (next != null && next.waitStatus <= 0)
    36:                 compareAndSetNext(pred, predNext, next);
    37:         } else {
    38:             unparkSuccessor(node);
    39:         }
    40: 
    41:         node.next = node; // help GC
    42:     }
    43: }
    

    1.1.6 唤醒执行权利的继承者 - unparkSuccessor

    【todo,将waitstutas置为0,0的含义是什么,在cancelAcquire中,删除队中节点时(29~36),也判断了0】
    首先判断当前当前节点的状态,如果<0,意味着当前节点的状态还为存活状态,因为只有被取消的节点状态,才置为1。然后将当前节点状态置为0,0我理解是已经获取过执行权利的节点状态值。
    随后唤醒下一个可被唤醒的节点,已经取消的节点会被跳过【todo,不会存在 可用-不可用-可用-不可用的节点顺序吗?】
    最终调用==LockSupport.unpark==对后继结点进行唤醒操作。【todo,LockSupport原理】

    
    private void unparkSuccessor(Node node) {
        //当前节点状态
        int ws = node.waitStatus;
        //当前状态 < 0 则设置为 0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        //当前节点的后继节点
        Node s = node.next;
        //后继节点为null或者其状态 > 0 (超时或者被中断了)
        if (s == null || s.waitStatus > 0) {
            s = null;
            //从tail节点来找可用节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //唤醒后继节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    
    

    相关文章

      网友评论

          本文标题:AQS学习

          本文链接:https://www.haomeiwen.com/subject/ozzthhtx.html