AQS详解

作者: edolovee | 来源:发表于2018-03-17 10:07 被阅读0次

    简介

    AQS(AbstractQueuedSynchronizer) 是通过FIFO队列实现的一套框架,用于阻塞锁以及同步器比如semaphores、countdownlatch、lock的实现均依赖它。它通过一个原子的int值实现的同步,子类可以通过保护方法改变该int变量的值。
    它提供了两种模式,分别为独占模式和共享模式,独占模式只允许被一个线程获取到而共享模式则允许多个线程获取。本文主要介绍的几个方法就是这两种模式的体现。

    独占模式

    实现方式:一个线程获取到锁并释放后,后续线程才能获取到锁,在队列中表示为当头结点将锁释放后会将后继结点唤醒

    1. void acquire(int arg)
    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
    }
    # tryAcquire 为具体实现,就是对state变量的操作
    
    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    #前置节点
                    final Node p = node.predecessor();
                    #前置节点为头节点并且资源可以获取到
                    if (p == head && tryAcquire(arg)) {
                        #将当前节点设置为头节点,前置节点不再维护了
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                     #shouldParkAfterFailedAcquire作用就是将前置节点状态置为SIGNAL,下面详细分析,parkAndCheckInterrupt是将当前线程阻塞
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            if (ws == Node.SIGNAL)
               #节点状态为SIGNAL时返回
                return true;
            if (ws > 0) {
               # 将取消的节点跳过
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
               #前置节点状态设置为SIGNAL
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
    

    举例说明,当有两个线程A、B调用tryAcquire方法时,A、B按照顺序依次入队新来的始终排在队尾,只要A还没有获取到,那么A肯定是阻塞的,它的后继节点也肯定是阻塞的
    画个图表示下:


    image.png
    1. boolean release(int arg)
    public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    #tryRelease为具体实现,也是对state变量的操作
    

    它的实现也是比较简单就是当资源拿到时,直接将链表的头部节点的后继节点唤醒,唤醒代码如下:

    private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0)
                #头节点状态置为0
                compareAndSetWaitStatus(node, ws, 0);
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    当后继节点被唤醒后,那么该节点对应的线程会继续执行acquireQueued方法中的for循环,这时当拿到资源后,那么就会将该节点设置为头节点,返回成功了


    image.png

    共享模式

    实现方式:一个线程获取到锁后,会通知后继结点,后继结点获取到锁后继续向后传播,这样就能使得多个线程获取到同一个锁

    1. void acquireShared(int arg)
     public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    # tryAcquire 为具体实现,就是对state变量的操作
    #与acquireQueued的不同之处在于,该方法加入的节点类型为SHARED,另外就是在获取到资源>0时,该方法会将做传播操作
    private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            #r>0当前节点置为头节点,并传播,下面细说
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    
    private void setHeadAndPropagate(Node node, int propagate) {
            Node h = head; 
            #当前节点设置为头节点
            setHead(node);
            #当获取资源数大于0 或者头部不存在或者之前的以及当前的头节点状态小于0,则当前节点的后继节点需要被唤醒
            if (propagate > 0 || h == null || h.waitStatus < 0 ||
                (h = head) == null || h.waitStatus < 0) {
                Node s = node.next;
                #后继节点为共享类型
                if (s == null || s.isShared())
                    doReleaseShared();
            }
    
    #作用就是将头节点状态从SIGNAL置为0,并唤醒后继节点,
    #再将头节点状态置为PROPAGATE,这样当后继节点被唤醒后,
    #它获取到资源后把自己置为头结点,继续重复之前动作唤醒后继节点
    private void doReleaseShared() {
            for (;;) {
                Node h = head;
                if (h != null && h != tail) {
                    int ws = h.waitStatus;
                    if (ws == Node.SIGNAL) {
                        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                            continue;            // loop to recheck cases
                        unparkSuccessor(h);
                    }
                    else if (ws == 0 &&
                             !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                        continue;                // loop on failed CAS
                }
                if (h == head)                   // loop if head changed
                    break;
            }
        }
    
    1. boolean releaseShared(int arg)
      上边已经介绍

    过程如下图所示
    初始阶段有3个线程都来调用acquireShared时,状态如下


    image.png

    当A获取到资源时,则


    image.png

    相关文章

      网友评论

          本文标题:AQS详解

          本文链接:https://www.haomeiwen.com/subject/ikihqftx.html