美文网首页JAVA之多线程系列
深入理解AbstractQueuedSynchronizer

深入理解AbstractQueuedSynchronizer

作者: VayneP | 来源:发表于2020-02-22 19:47 被阅读0次

    1. AQS简介

    在同步组件的实现中,AQS是核心部分,同步组件的实现者通过使用AQS提供的模板方法实现同步组件语义。AQS实现了对同步状态的管理,以及对阻塞线程进行排队,等待通知等一些底层的实现处理。AQS的核心包括了这些:同步队列,独占式锁的获取和释放,共享锁的获取和释放以及可中断锁,超时等待锁获取

    独占式锁:

    void acquire(int arg); //独占式获取同步状态,如果获取失败则插入同步队列进行等待
    
    void acquireInterruptibly(int arg):与acquire方法相同,但在同步队列中进行等
    待的时候可以检测中断
    
    boolean tryAcquireNanos(int arg, long nanosTimeout):在acquireInterruptibly
    基础上增加了超时等待功能,在超时时间内没有获得同步状态返回false;
    
    boolean release(int arg):释放同步状态,该方法会唤醒在同步队列中的下一个节点
    
    

    共享式锁:

    void acquireShared(int arg):共享式获取同步状态,与独占式的区别在于同一时刻有多个线程获取同步状态;
    
    void acquireSharedInterruptibly(int arg):在acquireShared方法基础上增加了能响应中断的功能;
    
    boolean tryAcquireSharedNanos(int arg, long nanosTimeout):在acquireSharedInterruptibly
    基础上增加了超时等待的功能;
    
    boolean releaseShared(int arg):共享式释放同步状态
    

    2. 同步队列

    当共享资源被某个线程占有,其他请求该资源的线程将会阻塞,从而进入同步队列。AQS中同步队列通过则是通过链式实现。

    AQS内部静态类Node:

    volatile int waitStatus //节点状态
    volatile Node prev //当前节点/线程的前驱节点
    volatile Node next; //当前节点/线程的后继节点
    volatile Thread thread;//加入同步队列的线程引用
    Node nextWaiter;//等待队列中的下一个节点
    

    节点的状态:

    int CANCELLED = 1//节点从同步队列中取消
    int SIGNAL = -1//后继节点的线程处于等待状态,如果当前节点释放同步状态会通知后继节点,
    使得后继节点的线程能够运行;
    int CONDITION = -2//当前节点进入等待队列中
    int PROPAGATE = -3//表示下一次共享式同步状态获取将会无条件传播下去
    int INITIAL = 0;//初始状态
    

    AQS两个重要成员变量:

    private transient volatile Node head;
    private transient volatile Node tail;
    

    AQS实际上通过头尾指针来管理同步队列,同时实现包括获取锁失败的线程进行入队,释放锁时对同步队列中的线程进行通知等核心方法

    img

    由此可知

    1. 节点的数据结构,即AQS的静态内部类Node,包含节点的等待状态等信息;
    2. 同步队列是一个双向队列,AQS通过持有头尾指针管理同步队列;

    3. 独占锁

    3.1 独占锁的获取---acquire()

    调用lock()方法是获取独占式锁,获取失败就将当前线程加入同步队列。成功则线程执行。而lock()方法实际会调用AQS的aquire()方法,源码如下:

    public final void acquire(int arg) {
            //先看同步状态是否获取成功,如果成功则方法结束返回
            //若失败则先调用addWaiter()方法再调用acquireQueued()方法
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
    }
    

    acquire根据当前获得同步状态成功与否做了两件事情:1. 成功,则方法结束返回,2. 失败,则先调用addWaiter()然后在调用acquireQueued()方法。

    获取同步状态失败,入队操作

    当线程获取独占式锁失败后就会将当前线程加入同步队列,addWaiter()源码如下:

    private Node addWaiter(Node mode) {
            // 1. 将当前线程构建成Node类型
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            // 2. 当前尾节点是否为null?
            Node pred = tail;
            if (pred != null) {
              
                node.prev = pred;
                 // 3. 将当前节点尾插入的方式插入同步队列中
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            // 4. 队列尾节点为null,说明当前线程是第一个加入同步队列进行等待的线程或者第3步执行错误
            enq(node);
            return node;
    }
    

    主要分为2个部分:1. tail为Null,调用enq()方法插入到尾结点;2. tail不为Null,通过compareAndSetTail()插入到尾结点。若compareAndSetTail() 插入失败,则继续在enq()方法中插入。enq()承担两个任务:1. 处理当前同步队列尾节点为null时进行入队操作;2. 如果CAS尾插入节点失败后负责自旋进行尝试。

    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    //1. 构造头结点
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    // 2. 尾插入,CAS操作失败自旋尝试
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
    }
    

    enq()方法 总结:

    1. 在当前线程是第一个加入同步队列时,调用compareAndSetHead(new Node())方法,完成链式队列的头结点的初始化
    2. 自旋不断尝试CAS尾插入节点直至成功为止

    同步队列中的节点(线程)会做什么事情了来保证自己能够有机会获得独占式锁了?acquireQueued()这个方法的作用就是排队获取锁的过程,源码如下:

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true; //获取失败标志,初值为true
            try {
                boolean interrupted = false;//记录线程在队列中获取锁的过程中是否发生过中断
                //死循环,在循环中线程可能会被阻塞0次,1次,或多次,直到获取锁成功才跳出循环,方法返回
                for (;;) {
                    // 1. 获得当前节点的先驱节点
                    final Node p = node.predecessor();
           //2.只有当前结点的前驱是头结点,当前线程才被允许尝试获取锁;只有获取锁成功才会跳出循环方法返回
                    if (p == head && tryAcquire(arg)) {
                      //获取锁成功会将当前结点设为头结点
                        setHead(node);
                        //释放前驱节点
                        p.next = null; // help GC
                        failed = false;
                        return interrupted; //返回是否发生过中断
                    }
              //线程不被允许获取锁或获取失败都会进入下面的方法检查是否自己可以阻塞;被唤醒后记录是否是被中断唤醒的
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())  //如果是被中断唤醒的,会记录下来被中断过
                        interrupted = true;
                }
            } finally {
                if (failed) //线程发生异常则取消获取锁行为
                    cancelAcquire(node);
            }
    }
    

    在第2步中,只有前驱节点是头结点的时候才能够尝试获取同步状态,原因如下:

    1. 头结点表示成功获取到同步状态的节点,而头结点的线程释放了同步状态后,将会唤醒其后继节点,后继节点的线程被唤醒需要检查自己的前驱节点是否是头结点
    2. 维护同步队列的FIFO原则

    shouldParkAfterFailedAcquire() 方法如下:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; // 获取前驱的等待状态
        if (ws == Node.SIGNAL)  //1. 前驱的等待状态已经是SIGNAL,则当前线程可以放心阻塞
            return true;  //表示要阻塞
       if (ws > 0) {  //前驱等待状态为CANCELLED,说明前驱已无效
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);//2. 不断向前寻找状态不为CANCELLED的结点,同时将无效结点链成一个不可达的环,便于GC
            pred.next = node;  //找到状态不为CANCELLED的结点
        } else {//3. 前驱状态是PROGAGATE或0时,将其前驱的状态设为SIGNAL,在再次尝试失败后才阻塞
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;  //表示还要再尝试
    }
    

    第三步将前驱节点的状态设为SIGNAL是表示前驱节点有后继节点。见 release()方法

    3.2 独占锁的释放---release()

    public final boolean release(int arg) {
            if (tryRelease(arg)) {  //尝试释放锁
                Node h = head;
                //如果head的waitStatus为0说明没有后继了,因为如果有后继,它的后继在阻塞前一定会把它的waitStatus设为SIGNAL
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h); //唤醒后继
                return true;
            }
            return false;
    }
    //唤醒后继
    private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;  //node是获取了锁的结点
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);  //唤醒后继前,重置waitStatus为0
            Node s = node.next;  //node的next指针不一定指向其后继,当node的状态为cancelled的时候,其next指向自己
            if (s == null || s.waitStatus > 0) {  //这里的s == null的条件判断不理解(?)
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)  //从后往前找node的后继中第一个没有被取消的结点
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);  //唤醒该结点线程
    }
    

    4. 共享锁

    4.1 共享锁的获取---acquireShared()

    刚开始时,在共享资源允许范围内会有多个线程同时共享该锁,剩下的线程就被加入到等待队列中排队阻塞等待;当持有锁的线程释放锁时,它会唤醒在队列中等待的后继,而这个后继在获取锁之后会继续检查资源的剩余量,如果还有剩余,它会接着唤醒自己的后继。也就是说,共享模式下,线程无论是在获取锁或者释放锁的时候,都可能会唤醒其后继,而且在共享资源允许的条件下引起多个线程被连续唤醒。如果有多个线程同时获取了共享锁,则head指向的那个是CLH队列中最后一个持有锁的线程,其他的都已经出队了。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)  //尝试获取锁,成功返回值大于等于0,失败返回值小于0
            doAcquireShared(arg); //如果失败,则调用doAcquireShared方法获取锁
    }
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED); //线程入队
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor(); //取当前结点的前驱
                if (p == head) {  //前驱为头结点才允许尝试获取锁,这里体现了入队之后获取资源的顺序性,只要入队,就是顺序的了
                    int r = tryAcquireShared(arg); 
                    if (r >= 0) { //获取锁成功
                        setHeadAndPropagate(node, r); //将当前线程设为头,然后可能执行对后继SHARED结点的连续唤醒
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                //获取锁失败,设置前驱waitStatus为SIGNAL,然后阻塞,这个过程与独占模式相同
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)  //获取锁过程中发生异常造成未成功获取,则取消获取
                cancelAcquire(node);
        }
    }
    private void setHeadAndPropagate(Node node, int propagate) { //propagate是资源剩余量,从上面的调用中可以看到
        Node h = head;  //将旧的头结点先记录下来
        setHead(node);  //将当前node线程设为头结点,node已经获取了锁
        //如果资源有剩余量,或者原来的头结点的waitStatus小于0,进一步检查node的后继是否也是共享模式
        if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {
            Node s = node.next; //得到node的后继
            if (s == null || s.isShared())  //如果后继是共享模式或者现在还看不到后继的状态,则都继续唤醒后继线程
                doReleaseShared();
        }
    }
    private void doReleaseShared() {
        for (;;) {
            Node h = head;  //记录下当前的head
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {  //如果head的waitStatus为SIGNAL,一定是它的后继设的,共享模式下要唤醒它的后继
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //先将head的waitStatus设置为0,成功后唤醒其后继
                        continue;        // loop to recheck cases
                    unparkSuccessor(h); //关键,若成功唤醒了它的后继,它的后继就会去获取锁,如果获取成功,会造成head的改变
                } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //没有后继结点,设为PROPAGATE
                    continue;           // loop on failed CAS
            }
            if (h == head) //若head发生改变,说明后继成功获取了锁,此时要检查新head的waitStatus,判断是否继续唤醒(下次循环)
                break; //head没有发生改变则停止持续唤醒
        }
    }
    

    4.2 共享锁的释放

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {  //如果释放锁成功
            doReleaseShared();  //启动对后继的持续唤醒
            return true;
        }
        return false;
    }
    private void doReleaseShared() {
        for (;;) {
            Node h = head;  //记录下当前的head
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {  //如果head的waitStatus为SIGNAL,一定是它的后继设的,共享模式下要唤醒它的后继
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) //先将head的waitStatus设置为0,成功后唤醒其后继
                        continue;        // loop to recheck cases
                    unparkSuccessor(h); //关键,若成功唤醒了它的后继,它的后继就会去获取锁,如果获取成功,会造成head的改变
                } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) //没有后继结点,设为PROPAGATE
                    continue;           // loop on failed CAS
            }
            if (h == head) //若head发生改变,说明后继成功获取了锁,此时要检查新head的waitStatus,判断是否继续唤醒(下次循环)
                break; //head没有发生改变则停止持续唤醒
        }
    }
    

    共享锁总结:

    1. 与独占模式的最大不同是,共享模式下,线程无论是对共享锁成功获取还是对资源的释放都可能会引起连续唤醒。独占模式下只有当线程释放锁时才唤醒其后继,而且不会连续唤醒(暂时忽略取消造成的唤醒)

    2. 每次唤醒新的线程,这个线程尝试获取锁,如果获取到了锁,新线程除了将自己设为头结点之外,还会检查是否满足继续唤醒条件,如果满足,则继续唤醒其后继。(这里共享模式的获取没有仔细分析,但是只要大体理解就好)

    3. 在共享模式下,当队列中某个结点的waitStatus为0时,表明它没有后继(因为如果有后继,后继就会把它的waitStatus置为-1了),这时候线程会把它的waitStatus设置为PROPAGATE,表示一旦出现一个新的共享结点连接在该结点后,该结点的共享锁将传播下去。

    相关文章

      网友评论

        本文标题:深入理解AbstractQueuedSynchronizer

        本文链接:https://www.haomeiwen.com/subject/gklrqhtx.html