美文网首页程序员
三.AbstractQueuedSynchronizer研究

三.AbstractQueuedSynchronizer研究

作者: 蜗牛1991 | 来源:发表于2017-09-15 16:30 被阅读0次

    一.简介

    • AbstractQueuedSynchronizer是并发类诸如ReentrantLock、CountDownLatch、Semphore的核心
    • CAS算法是AbstractQueuedSynchronizer的核心

    二.结构

    • 全局参数
        //阻塞队列头
        private transient volatile Node head;
        //阻塞队列尾
        private transient volatile Node tail;
        //锁的参数,默认为0,不为零则已加锁
        private volatile int state;
    
    • 阻塞队列
      static final class Node {
            //共享锁
            static final Node SHARED = new Node();
            //独占锁
            static final Node EXCLUSIVE = null;
           //Node的四种状态
            static final int CANCELLED =  1;
            static final int SIGNAL    = -1;
            static final int CONDITION = -2;
            static final int PROPAGATE = -3;
          //Node的四种状态
            volatile int waitStatus;
           //Node的前后节点
            volatile Node prev;
            volatile Node next;
            Node nextWaiter;
    }
    
    • 唤醒与通知队列
    public class ConditionObject implements Condition, java.io.Serializable {
            private transient Node firstWaiter;
            private transient Node lastWaiter;
            private Node addConditionWaiter() {
                Node t = lastWaiter;
                // If lastWaiter is cancelled, clean out.
                if (t != null && t.waitStatus != Node.CONDITION) {
                    unlinkCancelledWaiters();
                    t = lastWaiter;
                }
                Node node = new Node(Thread.currentThread(), Node.CONDITION);
                if (t == null)
                    firstWaiter = node;
                else
                    t.nextWaiter = node;
                lastWaiter = node;
                return node;
            }
    }
    * 供子类实现方法
    
    image.png

    三.独占模式

    • 获取锁流程:
    image.png
    • 1.试图获取锁(tryAcquire)若失败,则进入acquireQueued方法获取阻塞队列
    • 2.获取阻塞队列首先将该线程加入该阻塞队列(addWaiter):阻塞队列head没有线程,是个空节点,新加入的线程排在队尾
    image.png
    • 3.队列构建好了,下一步就是在必要的时候从队列里面拿出一个Node了(acquireQueued)
      • 独占模式取队列首先获得当前Node的pre节点,只有pre节点为head节点才获取再次尝试获取锁(tryAcquire),若此时获取锁失败或pre节点不为head节点时,让当前线程阻塞。
     private final boolean parkAndCheckInterrupt() {
          //阻塞该线程
          LockSupport.park(this);
         return Thread.interrupted();
     }
    
    • 释放锁流程:
    • 如果tryRelease(arg)后state=0,则释放锁
    1 public final boolean release(int arg) {
    2     if (tryRelease(arg)) {
    3         Node h = head;
    4         if (h != null && h.waitStatus != 0)
    5             unparkSuccessor(h);
    6         return true;
    7     }
    8     return false;
    9 }
    

    四.共享模式

    • 共享模式下的acquire和独占模式下的acquire大部分逻辑差不多,最大的差别在于tryAcquireShared成功之后,独占模式的acquire是直接将当前节点设置为head节点即可,共享模式会执行setHeadAndPropagate方法,独占锁某个节点被唤醒之后,它只需要将这个节点设置成head就完事了,而共享锁不一样,某个节点被设置为head之后,如果它的后继节点是SHARED状态的,那么将继续通过doReleaseShared方法尝试往后唤醒节点,实现了共享状态的向后传播。
    private void setHeadAndPropagate(Node node, int propagate) {
             Node h = head; // Record old head for check below
             setHead(node);
           if(  proagate > 0 || h == null || h.waitStatus < 0) {
             Node s = node.next;
             if (s == null || s.isShared())
                doReleaseShared();
         }
     }
    
     private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    

    五.Condition通知/等待队列

    • await()方法的实现

    image.png
    • 1构建通知/等待队列(addConditionWaiter()):
      先拿到队列(Condition构建出来的也是一个队列)中最后一个等待者lastWaiter,如果lastWaiter是null,说明FIFO队列中没有任何Node,firstWaiter=Node;如果lastWaiter不是null,说明FIFO队列中有Node,原lastWaiter的next指向Node;无论如何,新加入的Node编程lastWaiter,即新加入的Node一定是在最后面。与阻塞对列相比,head不为null。new出来的Node的状态都是CONDITION。
    image.png
    • 2.释放Node的状态(fullyRelease),将state变为0。
    • 3.判断Node是否在AbstractQueuedSynchronizer构建的队列中,不在则用阻塞该线程LockSupport.park(this)
    • 4.要await()必然要先lock(),既然lock()了就表示没有竞争,没有竞争自然也没必要使用volatile+CAS的机制去保证什么。
    • signal()实现原理

      • 将Condition队列变为阻塞对列(AQS对列)
      • 尝试将Node的waitStatus从CONDITION置为0,这一步失败直接返回false
      • 当前节点进入调用enq方法进入AbstractQueuedSynchronizer队列,当前节点通过CAS机制将waitStatus置为SIGNAL
    • 某个await()的节点被唤醒之后并不意味着它后面的代码会立即执行,它会被加入到AbstractQueuedSynchronizer队列的尾部,只有前面等待的节点获取锁全部完毕才能轮到它
       final boolean transferForSignal(Node node) {
            if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
                return false;
            Node p = enq(node);
            int ws = p.waitStatus;
            if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
                LockSupport.unpark(node.thread);
            return true;
        }
    

    参考:再谈AbstractQueuedSynchronizer2:共享模式与基于Condition的等待/通知机制实现

    相关文章

      网友评论

        本文标题:三.AbstractQueuedSynchronizer研究

        本文链接:https://www.haomeiwen.com/subject/iwdssxtx.html