美文网首页
JAVA 多线程与高并发学习笔记(十四)——AQS核心原理

JAVA 多线程与高并发学习笔记(十四)——AQS核心原理

作者: 简单一点点 | 来源:发表于2022-09-14 10:46 被阅读0次

    基于 CAS自旋实现的轻量级锁在恶性自旋时会消费大量的CPU资源。解决这个问题有2种方案:分散操作热点和使用队列削峰。JUC并发包使用的是队列削峰的方案解决CAS性能问题,并提供了一个基于双向队列的削峰基类——抽象基础类AbstractQueuedSyncronizer(抽象同步器类,简称为AQS)。

    AQS的内部队列

    AQS是CLH队列的一个变种,原理和CLH类似。AQS队列内部维护的是一个FIFO双向链表,其中的每个节点其实是由线程封装的,当线程争抢锁失败后会封装成节点加入AQS队列中:当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点。

    AQS的内部结构可以参考下图。

    AQS内部结构.png

    AQS核心成员

    AQS基本模板模式实现。AQS为锁获取、锁释放的排队和出队过程提供了一系列的模板方法。由于JUC的显式锁种类丰富,因此AQS将不同锁的具体操作抽取为钩子方法,供各类锁去实现。

    状态标志位

    AQS 中维持了一个单一的 volatile 修饰的状态信息是 state,AQS使用int类型的state标志锁的状态,可以理解为锁的同步状态。

    // 同步状态
    private volatile int state;
    

    state 因为使用了 volatile 保证了操作的可见性,AQS提供了 getState()setState() 来获取和设置同步状态。

    // 获取同步状态
    protected final int getState() {
        return state;
    }
    
    // 设置同步状态
    protected final void setState(int newState) {
        state = neWState;
    }
    
    // 通过CAS设置同步状态
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.comapreAndSwapInt(this, stateOffset, expect, update);
    }
    

    由于 setState() 无法保证原子性,因此AQS给我们提供了 compareAndSetState() 方法利用底层 Unsafe 的 CAS机制来实现原子性。

    AbstractQueueSynchronizer 继承了 AbstarctOwnableSynchronizer,这个基类只有一个变量叫 exclusiveOwnerThread,表示当前占用该锁的线程,并且提供了相应的 get()set() 方法。具体如下:

    public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    
        // 表示当前占用该锁的线程
        private transient Thread exlusiveOwnerThread;
    
        ...
    }
    

    队列节点类

    AQS是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。节点类型通过内部类 Node 定义。

    static final class Node {
    
        // 节点等待状态值:取消状态
        static final int CANCELLED = 1;
        // 节点等待状态值:标识后继线程处于等待状态
        static final int SIGNAL = -1;
        // 节点等待状态值:标识当前线程正在进行条件等待
        static final int CONDITION = -2;
        // 节点等待状态值:标识下一次共享锁的acquireShared操作需要无条件传播
        static final int PROPAGATE = -3;
        // 节点状态
        // 普通的同步节点初始值为0,条件等待节点的初始值为CONDITION(-2)
        volatile int waitState;
        // 节点所对应的线程,为抢锁线程或者条件等待线程
        volatile Thread thread;
        // 前驱节点
        volatile Node prev;
        // 后继节点
        volatile Node next;
        // 此属性指向下一个条件等待节点
        Node nextWaiter;
    }
    

    waitStatus 属性

    每个节点与等待线程关联,每个节点维护一个状态 waitStatus,它的各个常量值具体如下:

    • CANCELLED(1)。1表示该线程节点已经释放(超时、中断),已取消的节点不会再阻塞,不会参与竞争,会一直保持取消状态。
    • SIGNAL(-1)。表示其后继的节点处于等待状态,当前节点对应的线程如果释放了同步状态或者被取消,就会通知后继节点,使后继节点的线程得以运行。
    • CONDITION(-2)。表示该线程在条件队列中阻塞,表示节点在等待队列中。当持有锁的线程调用了CONDITION的signal() 方法之后,节点会从该CONDITION的等待队列转移到该锁的同步队列上,去竞争锁。
    • PROPAGATE(-3)。表示下一个线程获取共享锁后,自己的共享状态会被无条件地传播下去,因为共享锁可能出现同时有N个锁可以用。
    • 0。表示当前节点处于初始状态。

    thread成员

    Node 的 thread 成员用来存放进入AQS队列中的线程引用,Node的nextWaiter成员用来执行自己的后继等待节点。

    抢占类型常量标识

    Node 节点还定义了两个抢占类型常量标识,SHARED和EXCLUSIVE,具体如下:

    // 标识节点在抢占共享锁
    static final Node SHARED = new Node();
    
    // 表示节点在抢占独占锁
    static final Node EXCLUSIVE = null;
    
    

    FIFO双向同步队列

    AQS通过内置的FIFO双向队列来完成线程的排队工作,内部通过节点 head 和 tail 记录队首和队尾元素,元素的节点类型为 Node 类型,具体如下:

    /* 首节点的引用 */
    private transient volatile Node head;
    
    /* 尾节点的引用 */
    private transient volatile Node tail;
    

    AQS 的首尾节点都是懒加载的,需要的时候才真正创建。只有在线程竞争失败的情况下,有新线程加入同步队列时,AQS才创建一个head节点。尾节点只在有新线程阻塞时才被创建。

    JUC显式锁和AQS的关系

    AQS是一个同步器,它实现了锁的基本抽象功能,支持独占锁与共享锁两种方式,该类是使用模板模式来实现的,成为构建锁和同步器的框架,使用该类可以简单且高效地构造出应用广泛的同步器。

    JUC中的显式锁、线程同步工具等,内部都使用了AQS作为等待队列。

    AQS中的模板模式

    模板模式

    模板模式是类的行为模式。准备一个抽象类,将部分的逻辑以具体方法的形式实现,然后声明一些抽象方法来迫使子类实现剩余的逻辑。不同的子类提供不同的方式实现这些抽象方法,从而对剩余的逻辑有不同的实现。

    模板模式的关键在于:父类提供框架性的公共逻辑,子类提供个性化的定制逻辑。

    模板方法(Template Method)也被称为骨架方法,主要定义了整个方法需要实现的业务操作的算法框架。

    钩子方法(Hook Method)是被模板方法的算法框架所调用的,由子类提供具体的实现方法。

    AQS的模板流程

    AQS定义了两种资源共享方式:

    • Exclusive(独占锁):只有一个线程能占有锁资源。
    • Share(共享锁):多个线程可以同时占有锁资源。

    AQS 为不同的资源共享方式提供了不同的模板流程。

    AQS中的钩子方法

    自定义同步器时,AQS中需要重写的钩子方法大致如下:

    • tryAcquire(int):独占锁钩子,尝试获取资源,若成功则返回true。
    • tryRelease(int):独占锁钩子,尝试释放资源,若成功则返回true。
    • tryAcquireShared(int):共享锁钩子,尝试获取资源,负数表示失败,0表示成功,但没有剩余可用资源,正数表示成功,且有剩余资源。
    • tryReleaseShared(int):共享锁钩子,尝试释放资源,若成功则返回true。
    • isHeldExclusively():独占锁钩子,判断该线程是否正在独占资源。

    以上钩子的默认实现会抛出 UnsupportOperationException 异常。

    通过AQS实现一把简单的独占锁

    本部分模拟 ReentrantLock 实现一个简单的独占锁,真实的 ReentrantLock 要复杂很多。

    
    public class SimpleMockLock implements Lock {
        private final Sync sync = new Sync();
        @Override
        public void lock() {
            sync.acquire(1);
        }
    
        @Override
        public void unlock() {
            sync.release(1);
        }
        
    
        private static class Sync extends AbstractQueuedSynchronizer {
    
            // 钩子方法
            protected boolean tryAcquire(int arg) {
                // CAS更新状态值为1
                if(compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            // 钩子方法
            protected boolean tryRelease(int arg) {
                if(Thread.currentThread() != getExclusiveOwnerThread()) {
                    throw new IllegalMonitorStateException();
                }
                
                if(getState() == 0) {
                    throw new IllegalMonitorStateException();
                }
                
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
        }
    }
    

    AQS抢占锁的原理

    下面通过AQS中的方法讲解一下AQS抢占锁的原理。

    AQS模板方法:acquire

    acquire 是AQS 封装好的获取资源的公共入口,它是AQS提供的利用独占的方式获取资源的方法,编码实现如下:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    acquire 至少执行一次 tryAcquire 钩子方法。若调用成功,则 acquire 直接返回,表示已经抢到锁,若不成功,则将线程加入等待队列。

    钩子实现: tryAcquire

    tryAcquire 是需要实现的钩子方法,可以参照我们前面实现的 SimpleMockLock。

    直接入队:addWaiter

    在 acquire 模板方法中,如果钩子方法 tryAcquire 尝试获取同步状态失败的话,就构造同步节点,通过 addWaiter 方法将该节点加入同步队列的队尾。

    private Node addWaiter(Node mode) {
        // 创建新节点
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        // 加入队列尾部,将目前的队列tail作为自己的前驱节点pred
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // 现场时通过AQS方式修改尾节点为最新的节点
            // 如果修改陈宫,将节点加入队列的尾部
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 第一次尝试添加尾部失败,意味着有并发抢锁发生,需要进行自旋
        enq(node);
        return node;
    }
    

    addWaiter 方法中,首先需要构造一个 Node 对象,其中有两个参数,第一个是当前线程,第二个是Node节点,可能值为 SHARED 或 EXCLUSIVE。

    自旋入队:enq

    addWaiter 第一次尝试在尾部加节点失败,意味着有并发抢锁发生,需要进行自旋,enq 方法通过 CAS 自旋将节点添加到队列尾部。

    /**
     * 这里进行了循环,如果此时存在tail,就执行添加新队尾的操作
     * 如果依然不存在,就把当前线程作为 head 节点
     * 插入节点后,调用acquireQueued() 进行阻塞
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                // 队列为空,初始化尾节点和头节点为新节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    
    /**
     * CAS 操作head指针,仅仅被enq()调用
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
    
    /**
     * CAS 操作tail指针,仅仅被enq()调用
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }
    

    自旋抢占:acquireQueued

    在节点入队之后,启动自旋抢锁的流程,aquireQueued方法的主要逻辑:当前Node节点线程在死循环中不断获取同步状态,并且不断在前驱节点上自旋,只有当前驱节点是头节点时才能尝试获取锁,原因是:

    1. 头结点是成功获取同步状态(锁)的节点,而头节点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头节点。
    2. 维护同步队列的FIFO原则,节点进入同步队列之后,就进入了自旋的过程,每个节点都在不断地执行for死循环。
    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                // 自选检查当前节点的前驱节点是否为头节点,才能获取锁
                for (;;) {
                    // 获取节点的前驱节点
                    final Node p = node.predecessor();
                    // 节点中的线程循环地检查自己的前驱节点是否为head节点
                    // 前驱节点是head时,进一步调用子类的 tryAcquire 实现
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 检查前一个节点的状态,预判当前获取锁失败的线程是否要挂起
                    // 如果需要挂起,调用 parkAndCheckInterrupt 方法挂起当前线程,直到被唤醒
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true; // 若两个操作都是true,则置true
                }
            } finally {
                // 如果等待过程中没有成功获取资源,
                // 那么取消节点在队列中的等待
                if (failed)
                    // 取消等待,将当前节点从队列中移除
                    cancelAcquire(node);
            }
        }
    

    为了不浪费资源,acquireQueued自旋过程中会阻塞线程,等待被前驱节点唤醒后才启动循环。如果成功就返回,否则执行 shouldParkAfterFailedAcquire、parkAndCheckInterrupt来达到阻塞的效果。

    挂起预判:shouldParkAfterFailedAcquire

    shouldParkAfterFailedAcquire 方法的主要功能是,将当前节点的有效前驱节点(不是CANCELLED类型的节点)找到,并且将有效前驱节点的状态设置为SIGNAL,之后返回true代表当前线程可以马上被阻塞了。

    具体分为三种情况:

    1. 如果前驱节点的状态为-1(SIGNAL),说明前驱的等待标志已设好,返回true表示设置完毕。
    2. 如果前驱节点的状态为1(CANCELLED),说明前驱节点本身不再等待了,需要跨越这些节点,然后找到一个有效节点,再把当前节点和这个有效节点的唤醒关系建立好,调整前驱节点的next指针为自己。
    3. 如果其他情况:-3、-2、0,那么通过CAS尝试设置前驱节点为SIGNAL,表示只要前驱节点释放锁,当前节点就可以抢占锁了。
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus; // 获取前驱节点的状态
        if (ws == Node.SIGNAL)
            /*
                * This node has already set status asking a release
                * to signal it, so it can safely park.
                */
            return true;
        if (ws > 0) {
            /*
                * Predecessor was cancelled. Skip over predecessors and
                * indicate retry.
                */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
                * waitStatus must be 0 or PROPAGATE.  Indicate that we
                * need a signal, but don't park yet.  Caller will need to
                * retry to make sure it cannot acquire before parking.
                */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    线程挂起:parkAndCheckInterrupt

    parkAndCheckInterrupt 的主要任务是暂停当前线程,具体如下:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // 调用park()使线程进入waiting状态
        return Thread.interrupted(); // 如果被唤醒,查看自己是否已经被中断
    }
    

    AQS 会把所有等待的线程构成一个阻塞等待队列,当一个线程执行完 lock.unlock(),会激活其后继节点,通过 LockSupport.unpark(postThread) 完成后继线程的唤醒。

    AQS 的两个关键点:节点的入队和出队

    理解AQS的一个关键点是掌握节点的入队和出队。

    节点的自旋入队

    节点在第一次入队失败后,就会开始自旋入队,分为以下两种情况:

    1. 如果AQS的队列非空,新节点入队的插入位置在队列的尾部,并且通过CAS方式插入,插入之后AQS的tail将指向新的尾节点。

    2. 如果AQS的队列为空,新节点入队时,AQS 通过 CAS 方法将新节点设置为头节点 head,并将 tail 指针指向新节点。

    private Node enq(final Node node) {
        for (;;) { // 自旋入队
            Node t = tail;
            if (t == null) { 
                // 队列为空,初始化尾节点和头节点为新节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 如果队列不为空,将新节点插入队列尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    节点的出队

    节点的出队算法在 acquireQueued() 方法中,这是一个模板方法,acquireQueued() 方法不断在前驱节点上自旋(for 循环),如果前驱节点是头节点并且当前线程使用钩子方法 tryAcquire() 获得了锁,就移除头节点,将当前节点设置为头节点。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            // 中断标记
            boolean interrupted = false;
            for (;;) {
                // 获取当前线程节点的前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点是头节点,则使当前线程尝试获取锁资源(tryAcquire方法忘了回头看第五步)
                if (p == head && tryAcquire(arg)) {
                    // 如果当前程线程获取锁资源成功,则将当前线程节点设置为头节点
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 根据前驱节点p的等待状态判断是否要将当前线程阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 生成CANCELLED状态节点,并唤醒节点
            if (failed)
                cancelAcquire(node);
        }
    }
    

    节点加入队列尾部后,如果其前驱节点不是头节点,通常情况下,该新节点所绑定的线程会被无限期阻塞,而不会去执行无效循环,从而导致CPU资源的浪费。

    对于公平锁,头节点一般是占用锁的节点,在释放锁时,会唤醒其后稷街店所绑定的线程,后继节点的线程被唤醒后会重新执行自旋抢锁逻辑。

    AQS 释放锁的原理

    AQS 释放锁分成 3 个部分:

    release 模板方法

    首先会调用 AQS 的 release 模板方法,代码如下:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    可以看到其中主要有 2 个方法,一个是 tryRelease 钩子方法,该方法会尝试释放当前线程持有的资源,由子类根据具体业务提供具体实现,执行成功后返回 true

    另一个方法是 unparkSucessor(),用来唤醒后继节点,代码如下。

    private void unparkSuccessor(Node node) {
        // 获取节点状态,释放锁的节点,也就是头节点
        int ws = node.waitStatus;
        // 若头节点状态小于0,则将其置为0,表示初始状态
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        Node s = node.next; // 找到后头的一个节点
        if (s == null || s.waitStatus > 0) {
            // 如果新节点已经被取消
            s = null;
            // 从队列尾部开始,往前去找醉千年的一个 waitStatus 小于0的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒后继节点对应的线程
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    ReentrantLock 抢锁流程

    下面分别对 ReentrantLock 的公平锁和非公平锁的实现进行讲述。

    ReentrantLock 非公平锁的抢占流程

    总体流程图如下:

    reentrantlock1.png

    非公平锁的同步器子类

    ReentrantLock 为非公平锁实现了一个内部的同步器—— NofairSync,器显式锁获取方法 lock() 的源码如下:

    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
    
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    

    首先用一个CAS操作判断 state 是不是0(表示当前锁未被占用),如果是0就把它置为1,并且设置当前线程为该锁的独占线程,表示获取锁成功。当多个线程同时尝试占用一个锁时,CAS操作只能保证一个线程操作成功,剩下的要排队。

    非公平体现在:如果占用锁的线程刚刚释放锁,state 置为0,而排队等待锁 的线程还未唤醒,新来的线程就直接抢占了该锁,那么久“插队”了。

    非公平抢占的钩子方法:tryAcquire

    如果非公平抢占没有成功,非公平锁的 lock 汇之星模板方法 acquire,首先会调用钩子方法 tryAcquire,非公平抢占的钩子方法实现如下:

    static final class NonfairSync extends Sync {
        ...
    
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }
    
     abstract static class Sync extends AbstractQueuedSynchronizer {
       
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 先直接获取锁的状态
            int c = getState();
            if (c == 0) {
                // 如果内部队列首节点的线程执行晚了,它会将锁的state置为0
                // 当前抢锁线程的下一步就是直接进行抢占
                // 发现state是空的,就直接拿来加锁使用
                if (compareAndSetState(0, acquires)) {
                    // 1.利用CAS自旋方式确认当前state确实为0,然后设置acquire(1)
                    setExclusiveOwnerThread(current);
                    // 设置当前执行的线程,直接返回true
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                // 2.当前线程和执行中的线程是同一个,也就意味着可重入操作
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                // 表示当前锁被1个线程重复获取了nextc次
                return true;
            }
            // 否则就返回false,表示没有成功获取当前锁,进入排队过程
            return false;
        }
        ...
    }
    

    其核心思想是当前线程尝试获取锁的时候,如果发现锁的状态为0,则直接尝试将锁拿过来,而不会考虑其他排队节点。

    ReentrantLock 公平锁的抢占流程

    ReentrantLock 公平锁的抢占流程如下:

    reetrantlock2.png

    公平锁的同步器子类

    ReentrantLock 为公平锁实现了一个内部的同步器——FairSync,其显式锁获取方法 lock 的代码如下:

    static final class FairSync extends Sync {
            
        final void lock() {
            acquire(1);
        }
        ...
    }
    

    其核心思想是通过 AQS 模板方法进入队列入队操作。

    公平抢占的钩子方法:tryAcquire

    公平锁的 lock 会执行模板方法 acquire,该方法首先会调用钩子方法 tryAcquire,其实现如下:

    static final class FairSync extends Sync {
        ...
    
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState(); // 锁状态
            if (c == 0) {
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }
    

    公平锁的钩子方法中,首先判断是否有后继节点,如果有,并且当前线程不是锁的占有线程,钩子方法就返回 false,模板方法会进入排队的执行流程。

    是否有后继节点的判断

    FairSync 是否有后继节点的判断代码如下:

    public final boolean hasQueuedPredecessors() {
        Node t = tail; 
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
    

    执行场景大致如下:

    1. h != t 不成立的时候,说明 h 头节点,t 尾节点要么是同一节点。要么都是 null,此时返回 false,表示没有后继节点。

    2. h != t 成立的时候,进一步检查 head.next 是否为 null,如果为 null,返回 true,这种场景在有其他线程第一次正在入队时可能会出现。

    3. 如果 h != t 成立,head.next != null,判断 head.next 是不是当前线程,如果是就返回 false,不是就返回true

    AQS 条件队列

    Condition 是 JUC 用来代替传统 Objectwait/notify 线程间通信与写作机制的新组件,它更加的高效。

    Condition 基本原理

    ConditionObject 类是实现条件队列的关键,每个 ConditionObject 对象都维护一个单独的条件等待队列。每个 ConditionObject 对应一个条件队列,它记录该队列的头结点和尾节点。

    public class ConditionObject implements Condition, java.io.Serializable {
            
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
    }
    

    一个 Condition 对象是一个单条件的等待队列.

    condtionObject.png

    在一个显式锁上,我们可以创建多个等待任务队列,这点和内置锁不同,Java 内置锁上只有唯一的一个等待队列。比如,我们可以使用 newCondition 创建两个等待队列,具体如下:

    private Lock lock = new ReentrantLock();
    //创建第一个等待队列
    private Condition firstCond = lock.newCondition();
    //创建第二个等待队列
    private Condition secondCond = lock.newCondition();
    
    condition.png

    await等待方法原理

    当线程调用 await 方法时,说明当前线程的节点为当前AQS队列的头节点,正好处于占有锁的状态,await 方法需要把该线程从 AQS 队列挪到 Condition 等待队列里。

    在 await 方法将当前线程挪动到 Condition 等待队列后,还会唤醒 AQS 同步队列中的 head 节点的下一个节点,方法代码如下:

     public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter(); // 1
        int savedState = fullyRelease(node); // 2
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) { // 3
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) &&  interruptMode != THROW_IE) // 4
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // 5
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }
    

    整体流程如下:

    1. 执行 await 时,会新创建一个节点并放入 Condition 队列尾部。
    2. 然后释放锁,并唤醒AQS同步队列中的头节点的后一个节点。
    3. 然后执行 while 循环,将该节点的线程阻塞,直到该节点离开等待队列,重新回到同步队列成为同步节点后,线程才退出 while 循环。
    4. 退出循环后,开始调用 acquireQueued() 不断尝试拿锁。
    5. 拿到锁后,会清空 Condition 队列中被取消的节点。

    创建一个新节点并放入 Condition 队列尾部的工作由 addCondtionWaiter 方法完成。

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // 如果尾节点取消,重新定位尾节点
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        // 创建一个新Node,作为等待节点
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        // 将新Node加入等待队列
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }
    

    signal唤醒方法原理

    线程在某个 ConditionObject 对象上调用 signal 方法后,等待队列中的 firstWaiter 会被加入同步队列中,等待节点被唤醒,流程如下:

    public final void signal() {
        // 如果当前线程不是持有该锁的线程,就抛出异常
        if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
        Node first = firstWaiter;
        if (first != null)
            doSignal(first); // 唤醒节点
    }
    
    // 执行唤醒
    private void doSignal(Node first) {
        do {
            // 出队的代码写的很巧妙
            // first出队,firstWaiter头部指向下一个节点,自己的nextWaiter
            if ( (firstWaiter = first.nextWaiter) == null)
                // 如果第二个节点为空,则尾部也为空
                lastWaiter = null;
            // 将原来头部first的后继置空,help for GC
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                    (first = firstWaiter) != null);
    }
    
    // 将被唤醒的节点转移到同步队列
    final boolean transferForSignal(Node node) {
    
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
    
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
    

    signal 方法的整体流程如下:

    1. 通过 enq() 方法自旋讲条件队列中的头节点放入 AQS 同步队列尾部,并获取它在 AQS 队列中的前驱节点。

    2. 如果前驱节点的状态是取消状态,或者设置前驱节点为 Signal 状态失败,就唤醒当前节点的线程,否则节点在同步队列的尾部,参与排队。

    3. 同步队列中的线程被唤醒后,表示重新获取了显式锁,然后继续执行 condition.wait() 语句后面的临界区代码。

    AQS实际应用

    JUC 的同一架构如下图所示。

    juc总体架构.png

    AQS 建立在 CAS 原子操作和 volatile 可见性变量的基础之上,为上层的显式锁、同步工具类、阻塞队列、线程池、并发容器、Future异步工具提供线程之间同步的基础设施。所以,AQS在JUC框架使用很广泛。

    相关文章

      网友评论

          本文标题:JAVA 多线程与高并发学习笔记(十四)——AQS核心原理

          本文链接:https://www.haomeiwen.com/subject/rnvkortx.html