AQS

作者: 今年五年级 | 来源:发表于2019-10-21 11:49 被阅读0次

    流程图

    未命名文件 (7).png

    入口

    直接new一个ReentrantLock,可以发现创建了一个非公平锁

        public ReentrantLock() {
            sync = new NonfairSync();
        }
    

    而这个非公平锁继承了这个sync

    static final class NonfairSync extends Sync {
            private static final long serialVersionUID = 7316153563782823691L;
    
            /**
             * Performs lock.  Try immediate barge, backing up to normal
             * acquire on failure.
             */
            final void lock() {
                if (compareAndSetState(0, 1))
                    setExclusiveOwnerThread(Thread.currentThread());
                else
                    acquire(1);
            }
    
            protected final boolean tryAcquire(int acquires) {
                return nonfairTryAcquire(acquires);
            }
        }
    

    即ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的,Sync类继承自AQS

    abstract static class Sync extends AbstractQueuedSynchronizer {
    }
    

    Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁)

    FairSync

    调用公平锁的lock方法

    static final class FairSync extends Sync {
            final void lock() {
                acquire(1);
            }
    // ...
        }
    

    AbstractQueuedSynchronizer# acquire

        public final void acquire(int arg) {
            if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    ReentrantLock.FairSync# tryAcquire

    尝试获取锁,返回值为true,代表获取成功,可能有两种情况:

    1. 没有现存等待锁
    2. 重入锁,线程本来就持有锁,可以理所当然直接获取
    protected final boolean tryAcquire(int acquires) {
                final Thread current = Thread.currentThread();
                int c = getState();
                if (c == 0) {
    // 1. 到这里说明此事没有线程池有锁,但是因为是公平锁,需要先看看是否CLH同步队列中有线程在等待
                    if (!hasQueuedPredecessors() &&
    // 2. 如果没有线程在等待,则CAS获取锁,成功了就获取了
    // 没有CAS成功的话,说明刚才尝试CAS的时候,另一个线程也在CAS,并且另一个线程抢先获得了锁,则直接走到最下面返回false
                        compareAndSetState(0, acquires)) {
    // 3. 获取到锁以后,通知大家,现在是我获取到了锁
                        setExclusiveOwnerThread(current);
                        return true;
                    }
                }
    // 这里说明已经有线程获取到锁了,且获取到锁的线程是当前线程
                else if (current == getExclusiveOwnerThread()) {
                    int nextc = c + acquires;
                    if (nextc < 0)
                        throw new Error("Maximum lock count exceeded");
                    setState(nextc);
                    return true;
                }
                return false;
            }
    

    如果没有获取到锁,则直接继续进入AQS的代码,去进入等待队列

    image.png

    AbstractQueuedSynchronizer# addWaiter

    /**
         * Creates and enqueues node for current thread and given mode.
         *
         * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
         * @return the new node
         */
        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    

    AbstractQueuedSynchronizer# enq

        private Node enq(final Node node) {
    // 自旋的方式入队
            for (;;) {
                Node t = tail;
    // CLH同步队列未初始化
                if (t == null) { // Must initialize
    // 初始化一个空的头结点
                    if (compareAndSetHead(new Node()))
    // 让头指针和尾指针指向这个空节点,这个初始化执行完,因为没有return,就继续自旋
                        tail = head;
                } else {
    // 初始化完以后进入这里,这里不断通过CAS尝试,将自己设置为新的尾结点,不成功的话,就继续自旋,继续让前驱指针指向新的尾结点
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
        }
    

    执行完上面的操作,开始准备执行acquireQueued这个方法

    image.png

    关键词

    1. AQS维护的同步队列是一个FIFO先进先出队列
    2. CLH同步队列中的节点的waitStatus等待状态默认是0

    CLH同步队列中节点获取锁步骤

    假设当前有两个线程,线程A和线程B,线程A已经获得锁,线程B获取锁失败,构建成Node节点进入CLH同步队列,同时通过enq()方法以后,CLH队列中如下图


    image.png
    final boolean acquireQueued(final Node node, int arg) {
        // 这个时候node节点是上图中的tail节点,也就是线程B所在的节点,arg=1
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                // 查看当前node节点的前置节点p
                if (p == head && tryAcquire(arg)) {
                   // 如果前置节点是头部节点,B线程进行尝试获取锁
                   // 如果!是如果!
                   // 尝试获取成功了,也就是A线程执行完成,然后设置当前当前可执行的节点是B线程
                    setHead(node);
                    // 把node节点的线程设置为null,然后head指到node上
                    p.next = null; // help GC
                    // 回收操作
                    failed = false;
                    // 返回,然后将执行的将会是B线程
                    return interrupted;
                }
                // 按照我们的线程ABC的样例,上面的if肯定不会执行
                // 1、因为即使node的前节点是head,但是也无法获取到锁,因为一直被A线程占有者
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    // 如果shouldParkAfterFailedAcquire返回true,则挂起,否则不挂起线程
                    interrupted = true;
                // 2、第一次循环设置了p节点状态位-1,然后B线程循环至这里被挂起
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    
    // 返回true,挂起,返回false则不挂起
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        
        if (ws == Node.SIGNAL)
            // 前节点的状态=-1,返回true,直接挂起即可
            return true;
        if (ws > 0) {
            // 前置节点状态有>0,说明前驱节点取消了排队,移除掉该节点
            do {
                node.prev = pred = pred.prev;
                // 拆分为2步
                // pred = pred.prev
                // node.prev = pred
            } while (pred.waitStatus > 0);
            // 当前节点开始往前扫描,从双向队列中除去>0的节点,直到<=0
            pred.next = node;
        } else {
            // 利用CAS设置前节点状态位-1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    上面代码acquireQueued中如果线程B获取锁失败,则判断是否挂起B,通过B的前驱节点head的等待状态,默认为0,然后将其设置为-1,在第二次获取失败的时候(因此锁还在被线程A占用),直接shouldParkAfterFailedAcquire返回true,并将B线程挂起,直到线程A释放锁,调用unpark唤醒CLH同步队列中第一个等待的有效节点B,此时线程B从836行醒过来,继续执行837行返回,并最终在accquireQueued中获得锁

    image.png

    节点入同步队列变化

    1. 如果线程A获取到锁,B线程在acquireQueued方法的时候,线程B初始化了head头结点,此时head==tail,并且由于此时的waitStatus没设置,因此java默认设置waitStatus==0
    image.png
    1. 线程B入队


      image.png

    我们可以看到在线程B入队列以后,此时head结点的waitStatus发生了变化。在
    shouldParkAfterFailedAcquire被执行的时候,线程B将前驱结点,即head结点的waitStatus设置为-1

    1. 如果线程C此时再进来,直接插到线程B后面,此时线程C的waitStatus==0
    image.png

    然后C同样需要执行acquireQueued()。此时的节点是线程C,其前驱节点是线程B,不是head,线程C直接运行到shouldParkAfterFailedAcquire方法中然后通过CAS方法设置B节点waitStatus为-1,并且之后又经过1次循环操作,最后返回true,此时队列为

    image.png

    总结:enq方法让新的获取锁失败线程进队,acquireQueued方法让新的获取锁失败线程节点的前驱节点的等待状态为-1,并且挂起该新的获取锁失败线程

    waitStatus中signal(-1)的意思是:代表后继节点需要被唤醒,即waitStatus记录的不是当前线程节点的状态,而是后继节点的状态。每个node入队的时候,都会将前驱节点的等待状态设置为SIGNAL,然后阻塞,等待被前驱节点唤醒

    释放锁

    ReentrantLock# unlock

    public void unlock() {
            sync.release(1);
        }
    

    AbstractQueuedSynchronizer# release

       public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
    // 如果头节点存在,且头节点的waitStatus不为0,即说明后面有节点需要唤醒
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    AbstractQueuedSynchronizer# tryRelease

            protected final boolean tryRelease(int releases) {
    // 当前的permit许可数量 - 要释放的数量
                int c = getState() - releases;
                if (Thread.currentThread() != getExclusiveOwnerThread())
                    throw new IllegalMonitorStateException();
                boolean free = false;
    // 释放许可以后,不再有线程持有锁的话
                if (c == 0) {
                    free = true;
                    setExclusiveOwnerThread(null);
                }
                setState(c);
                return free;
            }
    

    执行完回到下面代码这里,开始执行unparkSuccessor

    image.png

    AbstractQueuedSynchronizer# unparkSuccessor(唤醒后继节点)

    // 参数是头节点,从上图可以看到
      private void unparkSuccessor(Node node) {
            /*
             说明有后继节点需要唤醒,唤醒前,先清除-1的等待状态
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /*
        // 下面的代码就是唤醒后继节点,但是有可能后继节点取消了等待(waitStatus==1)那么就从队尾往前找,找到waitStatus<=0的所有节点中排在最前面的
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            if (s != null)
     // 唤醒线程
                LockSupport.unpark(s.thread);
        }
    

    节点出同步队列变化

    同步队列最开始没有元素的时候,构造了空的头节点,后面构建的新的获取锁失败线程节点等在该空节点后,之前持有锁的线程释放锁以后,唤醒该空的头节点后面的第一个线程B,并且清除该头节点,将唤醒线程所在的节点设置为新的头节点,当唤醒线程B执行结束释放锁的时候,此时清空的之前B节点改造的头节点信息,此时队列中仅仅存在一个C节点,并设置C节点为新的头节点,C节点线程C开始执行

    image.png

    最后,当同步队列没有线程等待的时候,仅仅剩下了一个thread为空,head,tail指针都指向的空节点,之前的一系列操作等于初始化了该同步队列,下一次再来线程的时候,直接不用进enq,直接入队列CAS-tail

    相关文章

      网友评论

        本文标题:AQS

        本文链接:https://www.haomeiwen.com/subject/ldftvctx.html