美文网首页
AQS框架最最最简单流程梳理

AQS框架最最最简单流程梳理

作者: YocnZhao | 来源:发表于2021-07-01 20:20 被阅读0次

    AQS框架最最最简单流程梳理

    AQS是Lock的基础,之前一直想自己梳理一下,奈何一直没有静下心来梳理,这里记录并做梳理,文章基于JDK 1.8。

    独占锁的上锁流程

    AQS源码中给了我们一个最常见的互斥锁的代码示例:

    public class MutexCustom implements Lock {
    
        public static final int STATUS_ORIGIN = 0;
        public static final int STATUS_TARGET = 1;
        private Sync sync;
    
        public MutexCustom() {
            sync = new Sync();
        }
    
        public static class Sync extends AbstractQueuedSynchronizer {
            @Override
            protected boolean tryAcquire(int arg) {
                if (compareAndSetState(STATUS_ORIGIN, STATUS_TARGET)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            @Override
            protected boolean tryRelease(int arg) {
                setState(STATUS_ORIGIN);
                setExclusiveOwnerThread(null);
                return true;
            }
    
            @Override
            protected boolean isHeldExclusively() {return true;}
    
            public Condition newCondition() {return new ConditionObject();}
    
            public int getCurrState() {return getState();}
        }
    
        @Override
        public void lock() {sync.acquire(STATUS_TARGET);}
    
        @Override
        public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(STATUS_TARGET);}
    
        @Override
        public boolean tryLock() {return sync.tryAcquire(STATUS_TARGET);}
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(STATUS_TARGET, unit.toNanos(time));
        }
    
        @Override
        public void unlock() {sync.release(STATUS_TARGET);}
    
        @Override
        public Condition newCondition() {return sync.newCondition();}
    
        public int getState() {return sync.getCurrState();}
    }
    

    我们先来关注最基础的最基础的最基础的锁的功能,也就是acquire获取锁release释放锁

    acquire获取锁
        public final void acquire(int arg) {
            // Node.EXCLUSIVE为null
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    这里有三个主要的方法:

    1. tryAcquire尝试获取锁,得到true,也就是获取到锁直接返回,也就是不会走里面的方法体,直接执行临界区的代码了。
    2. tryAcquire返回false,执行addWaiter,顾名思义,把自己加到等待的nodeList最后面
    3. acquireQueued

    首先明确锁是怎么生效的,如果这3个方法都很快的返回了结果,不管返回的是true还是false,都会继续往下执行,区别只不过是执不执行selfInterrupt()方法罢了,并没有影响当前线程继续往后执行后续方法,也就是临界区代码。
    如果这三个方法里面有某一个方法没有很快的返回结果,而是把自己困在了一个while或者for(;;)循环里面,或者使用LockSupport.park()方法,那么这个线程就阻塞住了,不能继续往后执行后续的临界区代码,那这个时候就实现了阻塞。
    在合理的逻辑下竞争和释放这个阻塞的代码就实现了锁。

    那究竟是在哪个方法里面阻塞住了呢?我们挨个方法去看:

    1.1. tryAcquire
    // 需要被重写,给出获取资源逻辑
    // 比如最简单的互斥锁,只需要在state为0的时候compareAndSetState(0,1)成功返回true就好了。
        protected boolean tryAcquire(int arg) {
            throw new UnsupportedOperationException();
        }
    

    这个没有什么好说的,既然AQS是实现Lock的基础框架,那肯定就要把逻辑代码空出来让开发者自己去实现,返回true表示竞争到了锁,可以执行临界区代码,反之没有竞争到锁,需要等待锁的释放并重新竞争。

    1.2. addWaiter
        private Node addWaiter(Node mode) {
            // mode如果是Node.EXCLUSIVE则为null,so,node的next现在为null
            Node node = new Node(Thread.currentThread(), mode);
            // 尝试快速插入到最尾端,也就是把新创建的node放到tail的next,然后用node取代tail。
            // tail是全局变量,初始化为null
            Node pred = tail;
            // 前提是tail不为空
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            // 如果tail为空则直接走enq方法。
            enq(node);
            return node;
        }
    
        // 其实比快速插入不一样的就是如果没有tail,把tail赋值成head,然后再继续append
        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    // 初始化状态tail为null,给head设置一个new Node(),同时把tail也指向这个head
                    // tail\head同为全局变量,初始化都为null
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    看着注释理一遍:
    addWaiter方法顾名思义,就是把自己的node插入到链表的最尾端,加入抢占资源的排队序列,等待抢占资源。
    需要注意的是,tail / head都是全局变量,都是初始化为null,刚开始运行的时候tail / head都为null,如果tail为null,会走enq方法,做了两件事

    1. tail为null,先把head初始化为一个new Node(),同时暂时把tail赋值为head。
    2. 把当前node赋值为head的next,同时当前node作为tail。
      注意返回值是t,而不是当前node,也就是返回node的preNode,是t,上述情况下当前链表有两个值head->tail,返回的应该是是head而不是tail。当然这里调用enq方法并没有使用它的返回值。addWaiter方法返回值是当前node。

    所以从空链表初始化结束后,链表里应该有两个节点,而不是只有一个head节点,保证了工作节点能获取到一个pre节点。

    compareAndSetHead和compareAndSetTail这些CAS方法都不是阻塞的,都是Unsafe方法实现,所谓乐观锁,不用关心脏数据,只需关心成功或者失败。

    1.3. acquireQueued

    这里附上一个Node状态的表格:

    类型 说明
    CANCELLED 1 当前节点由于超时或者中断被取消,节点进入这个状态以后将保持不变。 注:这是唯一大于0的值,很多判断逻辑会用到这个特征
    SIGNAL -1 后继节点会依据此状态值来判断自己是否应该阻塞、当前节点的线程如果释放了同步状态或者被取消、会通知后继节点、后继节点会获取锁并执行(当一个节点的状态为SIGNAL时就意味着其后置节点应该处于阻塞状态)
    CONDITION -2 当前节点正处在等待队列中,在条件达成前不能获取锁。
    PROPAGATE -3 当前节点获取到锁的信息需要传递给后继节点,共享锁模式使用该值
    INITIAL 0 节点初始状态,默认值,表示当前节点在sync同步阻塞队列中,等待着获取锁
    // 放入等待队列
        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        // 如果前一个节点是head并且这时获取到了临界资源,把当前node设置为head,head的next置为null
                        // 两个条件:
                        //          1. node是head的next节点,也就是轮到它了。
                        //          2. tryAcquire(arg)得到了资源,也就是获取到了锁。
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 注意哈,这里是&&,也就是shouldParkAfterFailedAcquire方法返回true才会执行parkAndCheckInterrupt,两个方法同时返回true才会执行interrupted = true
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
        // 检测失败的时候是否需要阻塞自己,我要睡觉了,等人来叫醒我再检测是不是需要排队。
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            // 注意,ws是上一个节点的status
            int ws = pred.waitStatus;
            // SINGAL: 此node的后续node通过park把阻塞住了,所以当前node在release或者取消acquire的时候必须unpark一下后续的node。
            // (当一个节点的状态为SIGNAL时就意味着其后置节点应该处于阻塞状态)
            if (ws == Node.SIGNAL)
                // 前置节点已经设置了SIGNAL,可以阻塞我的线程了,后续检测到Node.SIGNAL会唤醒我的线程
                return true;
            if (ws > 0) {
                // 前置节点被设置为cancel了,跳过pre,我来当pre, ^_^
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
                // 检测到自己是INITAL或者PROPAGATE,把pre的status设置为SIGNAL,继续循环
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    
        //park睡觉了,睡醒了之后检测是否需要中断。
        private final boolean parkAndCheckInterrupt() {
            LockSupport.park(this);
            return Thread.interrupted();
        }
    

    注释很清晰了,梳理一下:

    acquireQueued方法顾名思义,尝试获取锁,获取不到就进入队列等待。

    1. acquire处在一个for(;;)死循环中, 获取锁需要两个条件:
      1.1. 前置节点是head,也就是轮到我自己获取锁了
      1.2. tryAcquire(arg)==true,也就是竞争到了锁
    2. 没必要一直尝试获取锁,把自己标记为需要唤醒,同时调用park阻塞,告诉predecessor,需要的时候unpark我。

    LockSupport.park(this);之后当前线程会阻塞,直到有unpark操作才会继续往后执行。所以在for(;;)死循环中并没有一直尝试获取锁,在获取不到之后会阻塞自己,并把node的状态标记为SIGNAL。后续看到node的状态为SIGNAL,在自己的锁释放后会调用unpark唤醒自己继续执行for(;;)中没有执行完的部分,继续尝试tryAcquire获取锁,这时就能获取到锁并结束循环了。
    这就是最基础的获取锁的功能。

    release释放锁

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                // head不为空,并且status不是初始状态,则需要唤醒unpark一下下一个节点。否则下个节点还没有阻塞,不需要unpark唤醒。
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    
        // 需要unpark唤醒下一个节点
        private void unparkSuccessor(Node node) {
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            // 需要unpark的Thread被当前node的继任node持有,通常情况下是下一个node,但是有可能是cancel状态或者是null,
            // 从尾部逆序往前遍历以找到状态不是canceled的继任者。
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            // 唤醒下个节点
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    释放锁的流程看起来就简单了一些。tryRelease成功释放锁之后,找到下个节点,唤醒他。

    至于为什么需要从后往前遍历而不是从前往后遍历:

        private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    //看这一块的代码
                    node.prev = t;
                    // ①
                    if (compareAndSetTail(t, node)) {
                        // ②
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    这段代码是往tail插入新的next节点,使用CAS保证安全,如果当前线程执行到了②,但是这时有一个新线程执行到了①,新线程只执行了把node的pre指向了tail,这时tail的next仍然为null,这时tail变了再执行CAS就会失败了。
    这种情况下从后往前遍历是能够找到的,所以是安全的。

    参考链接:
    https://www.zhihu.com/question/50724462?sort=created
    https://blog.csdn.net/anlian523/article/details/106448512/
    https://blog.csdn.net/java_lifeng/article/details/102482634

    相关文章

      网友评论

          本文标题:AQS框架最最最简单流程梳理

          本文链接:https://www.haomeiwen.com/subject/ehlsultx.html