美文网首页程序员JAVA学习文集
用火车购票的方式打开 AQS同步器(二)

用火车购票的方式打开 AQS同步器(二)

作者: 小燃儿 | 来源:发表于2020-08-28 10:49 被阅读0次

1 引言

上篇文章讲述了java AQS结构以及其中排他API的 实现逻辑。而这一篇我们来看看其共享逻辑, 这里依旧使用上文的火车买票为例,便于理解。这里直接会从代码实现讲起,对于还不了解AQS的结构的建议先看一下上一篇文章:用火车购票的方式打开 AQS同步器(一)

2 AQS共享和排他逻辑的区别

共享实现方式与排他逻辑非常类似(毕竟是框架嘛)。排他逻辑其实可以看作是共享逻辑的一种特例,由于它的独占特性。排他逻辑tryAcquire方法返回值为真假,表示是否资源(一票一车的情况)已经被其他人占用。而共享逻辑tryAcquireShared方法返回值为资源数,只有当资源数<0,才表示当前无法获取执行权,它更像现实生活中的买票行为。而在等待状态时两者也表现出一些不同,代码里会讲到。


image.png

3 AQS同步器共享逻辑代码实现

共享(资源共享,只要买了车票后,就都可以上这辆车):

## 获取车票,没有则进入等待
public final void acquireShared(int arg) {        
    if (tryAcquireShared(arg) < 0) // 尝试购票,只要票数 > 0,就可以成功上车           
         doAcquireShared(arg); // 车票售完了,就需要等待了  
  }
 ## 如果自己前面是第一人,就尝试候补,否则就等待。 
 private void doAcquireShared(int arg) { 
        final Node node = addWaiter(Node.SHARED);  // 以共享模式加入到队列中;
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) { // 是队列第一个,
                    int r = tryAcquireShared(arg); // 那么自己也可以去尝试候补
                    if (r >= 0) { // 有>1张车票
                        setHeadAndPropagate(node, r);  // 设置自己为第一个人,并有多余车票信息向后传播
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
              
 if (shouldParkAfterFailedAcquire(p, node) && //  与排他逻辑一致                  
parkAndCheckInterrupt()) 
                    interrupted = true;  
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
 ## 将自己排第一位,并在车票多,并在状态<0就尝试通知后面的人
 private void setHeadAndPropagate(Node node, int propagate) { // node : 新来候补的人,pro..:车票数
        Node h = head; // Record old head for check below
        setHead(node);  // 当前人开始候补,排队为第一人
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||  
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared(); // 继续唤醒
        }
    }
## 这里要说明一下 if中的条件:
就如同注释所说,这里可能会引起不必要的唤醒。
propagate 指的是资源数,很好理解>0张后面的人自然可以准备候补。
后面两个h.waitStatus < 0:
前者为旧head的状态。小于0(PROPAGATE,在releaseShare方法(并发调用)中有可能设置为该值),说明有多余的车票出现了
后者为新head < 0,那么其实无论是PROPAGATE 还是SINGAL状态,其后一位都可以尝试后不了
这里可以这么理解:既然有可能马上轮到我,或者车票可能有多余的话,我就可以通知这些人进行尝试候补了​,这些人获取车票的机会很大

释放资源(其中有人退票或者下车了,就又有空余的车票了):

 public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) { // 下车或者退票(用于子类重写)
            doReleaseShared(); // 通知还在排队的人准备候补
            return true;
        }
        return false;
    }

## 需要通知后面的人候补,或者标记可能多张车票释放了
private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not,                                                                                                                                                                                                                                           */
        for (;;) {
            Node h = head; 
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {  // 唤醒状态,表明后面排队的需要被唤醒了
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); // 唤醒后面排队的人
                }
                else if (ws == 0 &&  // 即上一段compareAndSetWaitStatus(h, Node.SIGNAL, 0),然后其他人又被调用了该方法
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 将其状态设置为可传播,使其后续(被唤醒的人再获取车票失败后,可以再尝试获取一次,详情见shouldParkAfterFailedAcquire方法)
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

对于doReleaseShared简单说明下,为什么有compareAndSetWaitStatus(h, 0, Node.PROPAGATE)这个逻辑呢?因为共享模式下,可能会有多个人同时退票,这时候,按照候补队列FIFO的原则,那只能通知第一个人可以去候补了,但是这个候补的人可能没获取票(可能被特殊人先获取车票)。由于之前是多个人同时退票的(在候补失败后,这个时候又有人退票了),那么对于这次没有获取到车票的人,再获取一次车票是很有可能会成功的。所以说Node.PROPAGATE这个状态表示就像是,你在尝试候补失败的过程中又有人退票了,所以你再发起一次候补,成功机会 很大。

相关文章

网友评论

    本文标题:用火车购票的方式打开 AQS同步器(二)

    本文链接:https://www.haomeiwen.com/subject/fxaksktx.html