美文网首页
深入理解AQS(二)- 共享模式

深入理解AQS(二)- 共享模式

作者: Elvis_wty | 来源:发表于2020-04-06 18:18 被阅读0次

    共享锁与独占锁

    独占锁被某个线程持有时,其他线程只能等待当前线程释放后才能去竞争锁,而且只有一个线程能竞争锁成功。

    共享锁是可以被共享的,它可以被多个线程同时持有。如果一个线程获取共享锁成功,那么其他等待的线程也会去获取共享锁,而且获取大概率会成功。共享锁典型的有ReadWriteLock、CountdownLatch。

    AQS共享锁的实现

    共享锁获取

    1、acquireShared

    共享锁的acquireShared

        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
        
        protected int tryAcquireShared(int arg) {
            throw new UnsupportedOperationException();
        }
    

    独占锁的acquire

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    
    

    这边共享锁的acquiredShared方法其实与独占锁的acquire方法类似,tryAcquireShared都是需要子类去实现,区别是独占锁的tryAcquire返回的是boolean,而共享锁的tryAcquired方法返回的是int:

    • 如果该值<0,则代表获取共享锁失败
    • 如果该值=0,则代表获取共享锁成功,但随后其他线程获取共享锁会失败
    • 如果该值>0,则代表获取共享锁成功,并且后续线程获取共享锁也可能成功

    这边可以看下CountDownLatch中tryAcquiredShared的实现

            protected int tryAcquireShared(int acquires) {
                return (getState() == 0) ? 1 : -1;
            }
    

    state在AQS中是一个状态标识,具体的含义可以由子类来定义,在这边state定义成了数量。state等于0,则说明CountDownLatch的计数器为0,返回1成功,否则返回-1失败。

    2、doAcquireShared

        private void doAcquireShared(int arg) {
            //创建一个共享模式节点
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    //获取节点的前继节点
                    final Node p = node.predecessor();
                    if (p == head) {
                        //如果前继节点为投节点,则尝试获取锁
                        int r = tryAcquireShared(arg);
                        //r>=0则说明获取锁成功
                        if (r >= 0) {
                            //设置当前节点为头节点,并向后继节点传播
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    

    这部分逻辑和独占锁的addWaiter和acquireQueued的逻辑大体相同,只是把独占锁的逻辑addWaiter包括selfInterrupt都移到了一个方法中。
    区别有:

    • 独占锁中是setHead(node)而共享锁中是setHeadAndPropagate(node, r)

    shouldParkAfterFailedAcquire,parkAndCheckInterrupt等方法之前在独占锁中已经有过介绍了,这边就不再一一赘述了。

    3、setHeadAndPropagate

        private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; 
            //设置获取锁的节点为头节点
            setHead(node);
            //propagate>0说明当前节点已经获取锁成功,后继节点需要被唤醒
            //头节点waitStatus<0需要唤醒,即头节点不为初始和Canceled
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                if (s == null || s.isShared())
                    doReleaseShared();
            }
        }
    

    setHeadAndPropagate方法主要是为了设置获取锁的节点为头节点,并接下去直接唤醒后继节点。

    这边if (s == null || s.isShared()) 为何s==null也要继续唤醒后继节点呢?我的理解是这样子的:

                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
    

    addWaiter中添加新节点到队列尾部,但是设置2个指针的指向时并不是原子操作。极端情况下,在判断s==null的时候新加入的节点只prev指针指向了前继节点,而前继节点的next指针还没有指向新加入的节点,所以node.next==null这个判断并不能说明当前节点的后继节点一定为空。isShared判断后继线程是否为共享模式,如果不是共享模式则直接跳过。当然如果node.next==null满足后出现的后继线程是独占锁,那独占锁获取是获取不到的因为锁已经被占用,只有共享模式下才能获取锁。

    此时新加入的节点并没有执行park被挂起,当进入doReleaseShared后调用后继节点的unPark方法,即使后面在parkAndCheckInterrupt中调用了park方法,该线程也不会被阻塞挂起,这样就可以避免一次阻塞操作。而且doReleaseShared中唤醒后继节点unparkSuccessor方法是从尾节点向前继节点进行遍历,所以就保证了即使当前节点的next指针指向为空,新加入的节点也能够被唤醒。

    doReleaseShared方法到下面分析释放锁时在看。

    共享锁释放

    1、releaseShared

        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
        
        protected boolean tryReleaseShared(int arg) {
            throw new UnsupportedOperationException();
        }
    

    共享锁tryReleaseShared和和独占锁tryRelease一样都需要子类去实现。

    2、doReleaseShared

    private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    

    一个线程成功获取共享锁后会在2个地方去调用doReleaseShared方法,一个是在获取锁后设置头节点的setHeadAndPropagate方法中;另一个是在释放锁releaseShared成功后调用。

    node->A->B->C->D 队列

    A获取锁成功后,调用doReleaseShared唤醒B,当然保证B线程是共享模式同时也能够获取共享锁。

    node(A)->B->C->D

    B获取共享锁后,尝试去唤醒C,B成为新的头节点。

    node(B)->C->D

    节省了唤醒后继节点的速度,同时可能会出现多个doReleaseShared同时在运行,这边通过cas保证多个线程唤醒一个节点时只有一个线程能成功。

    if (ws == Node.SIGNAL) {
            if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                  continue;            // loop to recheck cases
            unparkSuccessor(h);
     }
    
    

    如果头节点的状态为Node.SIGNAL则说明后继节点是需要被唤醒的;

      else if (ws == 0 &&
              !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
               continue;                // loop on failed CAS
      }
    

    这边这个判断其实是一种极端状态,在最外层if (h != null && h != tail)已经进行了一次判断,保证当前队列至少有2个节点,但是这边2个节点的状态可能出现这样一种情况:

    oldHead->newHead(h节点&waitStatus=0)

    newHead(h节点&waitStatus=0)->tail

    newHead刚成为头节点,而tail节点刚进入成为尾节点,但tail节点并未执行shouldParkAfterFailedAcquire把前继节点修改为SIGNAL,满足!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)状态要在一瞬间执行完shouldParkAfterFailedAcquire把waitStatus修改为SIGNAL,这算是对一种短暂情况下的优化。

    总结

    AQS双向队列其实里面不仅包含了独占锁,也包含了共享锁,这个队列中2种模式是可以同时存在的。

    共享模式的调用其实和独占模式相当类似,唯一不同的是共享模式的锁可以被多个线程共享,而独占模式下锁同一时刻只能被一个线程拥有。

    共享模式下,头节点获取共享锁后可以立即唤醒后继节点,而不用等待获取共享锁后释放再唤醒,唤醒后继线程有2处,一处是获取到共享锁后可以立即唤醒后续的线程,但是后续线程必须是共享模式的线程;第二处是在释放锁后唤醒后继线程,这边我认为释放锁后唤醒的后继线程可以包含独占模式,但是前提是所有的独占模式前面所有的共享模式锁都已经释放。

    相关文章

      网友评论

          本文标题:深入理解AQS(二)- 共享模式

          本文链接:https://www.haomeiwen.com/subject/evobphtx.html