美文网首页
Java 中队列同步器 AQS(AbstractQueuedSy

Java 中队列同步器 AQS(AbstractQueuedSy

作者: mghio | 来源:发表于2020-06-11 22:00 被阅读0次

    前言

    Java 中通过 来控制多个线程对共享资源的访问,使用 Java 编程语言开发的朋友都知道,可以通过 synchronized 关键字来实现锁的功能,它可以隐式的获取锁,也就是说我们使用该关键字并不需要去关心锁的获取和释放过程,但是在提供方便的同时也意味着其灵活性的下降。例如,有这样的一个场景,先获取锁 A,然后再获取锁 B,当锁 B 获取到之后,释放锁 A 同时获取锁 C,当获取锁 C 后,再释放锁 B 同时获取锁 D,依次类推,像这种比较复杂的场景,使用 synchronized 关键字就比较难实现了。
    Java SE 5 之后,新增加了 Lock 接口和一系列的实现类来提供和 synchronized 关键字一样的功能,它需要我们显示的进行锁的获取和释放,除此之外还提供了可响应中断的锁获取操作以及超时获取锁等同步特性。JDK 中提供的 Lock 接口实现类大部分都是聚合一个同步器 AQS 的子类来实现多线程的访问控制的,下面我们看看这个构建锁和其它同步组件的基础框架——队列同步器 AQS(AbstractQueuedSynchronizer)

    AQS 基础数据结构

    同步队列

    队列同步器 AQS(下文简称为同步器)主要是依赖于内部的一个 FIFO(first-in-first-out)双向队列来对同步状态进行管理的,当线程获取同步状态失败时,同步器会将当前线程和当前等待状态等信息封装成一个内部定义的节点 Node,然后将其加入队列,同时阻塞当前线程;当同步状态释放时,会将同步队列中首节点唤醒,让其再次尝试去获取同步状态。同步队列的基本结构如下:

    AQS_QUEUE.png
    队列节点 Node

    同步队列使用同步器中的静态内部类 Node 用来保存获取同步状态的线程的引用、线程的等待状态、前驱节点和后继节点。

    AQS_inner_class_node.png

    同步队列中 Node 节点的属性名称和具体含义如下表所示:

    属性类型和名称 描述
    volatile int waitStatus 当前节点在队列中的等待状态
    volatile Node prev 前驱节点,当节点加入同步队列时被赋值(使用尾部添加方式)
    volatile Node next 后继节点
    volatile Thread thread 获取同步状态的线程
    Node nextWaiter 等待队列中的后继节点,如果当前节点是共享的,则该字段是一个 SHARED 常量

    每个节点线程都有两种锁模式,分别为 SHARED 表示线程以共享的模式等待锁,EXCLUSIVE 表示线程以独占的方式等待锁。同时每个节点的等待状态 waitStatus 只能取以下表中的枚举值:

    枚举值 描述
    SIGNAL 值为 -1,表示该节点的线程已经准备完毕,等待资源释放
    CANCELLED 值为 1,表示该节点线程获取锁的请求已经取消了
    CONDITION 值为 -2,表示该节点线程等待在 Condition 上,等待被其它线程唤醒
    PROPAGATE 值为 -3,表示下一次共享同步状态获取会无限进行下去,只在 SHARED 情况下使用
    0 值为 0,初始状态,初始化的默认值
    同步状态 state

    同步器内部使用了一个名为 stateint 类型的变量表示同步状态,同步器的主要使用方式是通过继承,子类通过继承并实现它的抽象方法来管理同步状态,同步器给我们提供了如下三个方法来对同步状态进行更改。

    方法签名 描述
    protected final int getState() 获取当前同步状态
    protected final void setState(int newState) 设置当前同步状态
    protected final boolean compareAndSetState(int expect, int update) 使用 CAS 设置当前状态,该方法能够保证状态设置的原子性

    在独享锁中同步状态 state 这个值通常是 0 或者 1(如果是重入锁的话 state 值就是重入的次数),在共享锁中 state 就是持有锁的数量。

    独占式同步状态获取与释放

    同步器中提供了 acquire(int arg) 方法来进行独占式同步状态的获取,获取到了同步状态也就是获取到了锁,该方法源码如下所示:

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    

    方法首先会调用 tryAcquire 方法尝试去获取锁,查看方法的源码可以发现,同步器并未对该方法进行实现(只是抛出一个不支持操作异常 UnsupportedOperationException),这个方法是需要后续同步组件的开发人员自己去实现的,如果方法返回 true 则表示当前线程成功获取到锁,调用 selfInterrupt() 中断当前线程(PS:这里留给大家一个问题:为什么获取了锁以后还要中断线程呢?),方法结束返回,如果方法返回 false 则表示当前线程获取锁失败,也就是说有其它线程先前已经获取到了锁,此时就需要把当前线程以及等待状态等信息添加到同步队列中,下面来看看同步器在线程未获取到锁时具体是如何实现。
    通过源码发现,当获取锁失败时,会执行判断条件与操作的后半部分 acquireQueued(addWaiter(Node.EXCLUSIVE), arg),首先指定锁模式为 Node.EXCLUSIVE 调用 addWaiter 方法,该方法源码如下:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    

    通过方法参数指定的锁模式(共享锁 or 独占锁)和当前线程构造出一个 Node 节点,如果同步队列已经初始化,那么首先会进行一次从尾部加入队列的尝试,使用 compareAndSetTail 方法保证原子性,进入该方法源码可以发现是基于 sun.misc 包下提供的 Unsafe 类来实现的。如果首次尝试加入同步队列失败,会再次调用 enq 方法进行入队操作,继续跟进 enq 方法源码如下:

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
    

    通过其源码可以发现和第一次尝试加入队列的代码类似,只是该方法里面加了同步队列初始化判断,使用 compareAndSetHead 方法保证设置头节点的原子性,同样它底层也是基于 Unsafe 类,然后外层套了一个 for (;;) 死循环,循环唯一的退出条件是从队尾入队成功,也就是说如果从该方法成功返回了就表示已经入队成功了,至此,addWaiter 执行完毕返回当前 Node 节点。然后以该节点作为 acquireQueued 方法的入参继续进行其它步骤,该方法如下所示:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    可以看到,该方法本质上也是通过一个死循环(自旋)去获取锁并且支持中断,在循环体外面定义两个标记变量,failed 标记是否成功获取到锁,interrupted 标记在等待的过程中是否被中断过。方法首先通过 predecessor 获取当前节点的前驱节点,当当前节点的前驱节点是 head 头节点时就调用 tryAcquire 尝试获取锁,也就是第二个节点则尝试获取锁,这里为什么要从第二个节点才尝试获取锁呢?是因为同步队列本质上是一个双向链表,在双向链表中,第一个节点并不存储任何数据是虚节点,只是起到一个占位的作用,真正存储数据的节点是从第二个节点开始的。如果成功获取锁,也就是 tryAcquire 方法返回 true 后,将 head 指向当前节点并把之前找到的头节点 p 从队列中移除,修改是否成功获取到锁标记,结束方法返回中断标记。
    如果当前节点的前驱节点 p 不是头节点或者前驱节点 p 是头节点但是获取锁操作失败,那么会调用 shouldParkAfterFailedAcquire 方法判断当前 node 节点是否需要被阻塞,这里的阻塞判断主要是为了防止长时间自旋给 CPU 带来非常大的执行开销,浪费资源。该方法源码如下:

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
              * This node has already set status asking a release
              * to signal it, so it can safely park.
              */
            return true;
        if (ws > 0) {
            /*
              * Predecessor was cancelled. Skip over predecessors and
              * indicate retry.
              */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
              * waitStatus must be 0 or PROPAGATE.  Indicate that we
              * need a signal, but don't park yet.  Caller will need to
              * retry to make sure it cannot acquire before parking.
              */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    方法参数为当前节点的前驱节点以及当前节点,主要是靠前驱节点来判断是否需要进行阻塞,首先获取到前驱节点的等待状态 ws,如果节点状态 wsSIGNAL,表示前驱节点的线程已经准备完毕,等待资源释放,方法返回 true 表示可以阻塞,如果 ws > 0,通过上文可以知道节点只有一个状态 CANCELLED(值为 1) 满足该条件,表示该节点线程获取锁的请求已经取消了,会通过一个 do-while 循环向前查找 CANCELLED 状态的节点并将其从同步队列中移除,否则进入 else 分支,使用 compareAndSetWaitStatus 原子操作将前驱节点的等待状态修改为 SIGNAL,以上这两种情况都不需要进行阻塞方法返回 false
    当经过判断后需要阻塞的话,也就是 compareAndSetWaitStatus 方法返回 true 时,会通过 parkAndCheckInterrupt 方法阻塞挂起当前线程,并返回当前线程的中断标识。方法如下:

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    

    线程阻塞是通过 LockSupport 这个工具类实现的,深入其源码可以发现它底层也是基于 Unsafe 类实现的。如果以上两个方法都返回 true 的话就更新中断标记。这里还有一个问题就是什么时候会将一个节点的等待状态 waitStatus 修改为 CANCELLED 节点线程获取锁的请求取消状态呢?细心的朋友可能已经发现了,在上文贴出的 acquireQueued 方法源码中的 finally 块中会根据 failed 标记来决定是否调用 cancelAcquire 方法,这个方法就是用来将节点状态修改为 CANCELLED 的,方法的具体实现留给大家去探索。至此 AQS 独占式同步状态获取锁的流程就完成了,下面通过一个流程图来看看整体流程:

    AQS_acquire.png

    下面再看看独占式锁释放的过程,同步器使用 release 方法来让我们进行独占式锁的释放,其方法源码如下:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    

    首先调用 tryRelease 方法尝试进行锁释放操作,继续跟进该方法发现同步器只是抛出了一个不支持操作异常 UnsupportedOperationException,这里和上文独占锁获取中 tryAcquire 方法是一样的套路,需要开发者自己定义锁释放操作。

    AQS_tryrelease.png

    通过其 JavaDoc 可以得知,如果返回 false,则表示释放锁失败,方法结束。该方法如果返回 true,则表示当前线程释放锁成功,需要通知队列中等待获取锁的线程进行锁获取操作。首先获取头节点 head,如果当前头节点不为 null,并且其等待状态不是初始状态(0),则解除线程阻塞挂起状态,通过 unparkSuccessor 方法实现,该方法源码如下:

    private void unparkSuccessor(Node node) {
        /*
          * If status is negative (i.e., possibly needing signal) try
          * to clear in anticipation of signalling.  It is OK if this
          * fails or if status is changed by waiting thread.
          */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
        /*
          * Thread to unpark is held in successor, which is normally
          * just the next node.  But if cancelled or apparently null,
          * traverse backwards from tail to find the actual
          * non-cancelled successor.
          */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    首先获取头节点的等待状态 ws,如果状态值为负数(Node.SIGNAL or Node.PROPAGATE),则通过 CAS 操作将其改为初始状态(0),然后获取头节点的后继节点,如果后继节点为 null 或者后继节点状态为 CANCELLED(获取锁请求已取消),就从队列尾部开始寻找第一个状态为非 CANCELLED 的节点,如果该节点不为空则使用 LockSupportunpark 方法将其唤醒,该方法底层是通过 Unsafe 类的 unpark 实现的。这里需要从队尾查找非 CANCELLED 状态的节点的原因是,在之前的获取独占锁失败时的入队 addWaiter 方法实现中,该方法如下:

    AQS_unparkSuccessor.png

    假设一个线程执行到了上图中的 ① 处,② 处还没有执行,此时另一个线程恰好执行了 unparkSuccessor 方法,那么就无法通过从前向后查找了,因为节点的后继指针 next 还没赋值呢,所以需要从后往前进行查找。至此,独占式锁释放操作就结束了,同样的,最后我们也通过一个流程图来看看整个锁释放的过程:

    AQS_release.png

    独占式可中断同步状态获取

    同步器提供了 acquireInterruptibly 方法来进行可响应中断的获取锁操作,方法实现源码如下:

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    

    方法首先检查当前线程的中断状态,如果已中断,则直接抛出中断异常 InterruptedException 即响应中断,否则调用 tryAcquire 方法尝试获取锁,如果获取成功则方法结束返回,获取失败调用 doAcquireInterruptibly 方法,跟进该方法如下:

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    仔细观察可以发现该方法实现源码和上文中 acquireQueued 方法的实现基本上类似,只是这里把入队操作 addWaiter 放到了方法里面了,还有一个区别就是当在循环体内判断需要进行中断时会直接抛出异常来响应中断,两个方法的对比如下:

    AQS_acquirequeued_interruptibly_compare.png

    其它步骤和独占式锁获取一致,流程图大体上和不响应中断的锁获取差不多,只是在最开始多了一步线程中断状态检查和循环是会抛出中断异常而已。

    独占式超时获取同步状态

    同步器提供了 tryAcquireNanos 方法可以超时获取同步状态(也就是),该方法提供了之前 synchronized 关键字不支持的超时获取的特性,通过该方法我们可以在指定时间段 nanosTimeout 内获取锁,如果获取到锁则返回 true,否则,返回 false。方法源码如下:

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }
    

    首先会调用 tryAcquire 方法尝试获取一次锁,如果获取锁成功则立即返回,否则调用 doAcquireNanos 方法进入超时获取锁流程。通过上文可以得知,同步器的 acquireInterruptibly 方法在等待获取同步状态时,如果当前线程被中断了,会抛出中断异常 InterruptedException 并立刻返回。超时获取锁的流程其实是在响应中断的基础上增加了超时获取的特性,doAcquireNanos 方法的源码如下:

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    由以上方法实现源码可以看出,针对超时获取这里主要实现思路是:先使用当前时间加上参数传入的超时时间间隔 deadline 计算出超时的时间点,然后每次进行循环的时候使用超时时间点 deadline 减去当前时间得到剩余的时间 nanosTimeout,如果剩余时间小于 0 则证明当前获取锁操作已经超时,方法结束返回 false,反如果剩余时间大于 0。
    可以看到在里面执行自旋的时候和上面独占式同步获取锁状态 acquireQueued 方法那里是一样的套路,即当当前节点的前驱节点为头节点时调用 tryAcquire 尝试获取锁,如果获取成功则返回。

    AQS_acquireQueued_doAcquireNanos_compare.png

    除了超时时间计算那里不同外,还有个不同的地方就是在超时获取锁失败之后的操作,如果当前线程获取锁失败,则判断剩余超时时间 nanosTimeout 是否小于 0,如果小于 0 则表示已经超时方法立即返回,反之则会判断是否需要进行阻塞挂起当前线程,如果通过 shouldParkAfterFailedAcquire 方法判断需要挂起阻塞当前线程,还要进一步比较超时剩余时间 nanosTimeoutspinForTimeoutThreshold 的大小,如果小于等于 spinForTimeoutThreshold 值(1000 纳秒)的话,将不会使当前线程进行超时等待,而是再次进行自旋过程。
    加后面这个判断的主要原因在于,在非常短(小于 1000 纳秒)的时间内的等待无法做到十分精确,如果这时还进行超时等待的话,反而会让我们指定 nanosTimeout 的超时从整体上给人感觉反而不太精确,因此,在剩余超时时间非常短的情况下,同步器会再次自旋进行超时获取锁的过程,独占式超时获取锁整个过程如下所示:

    AQS_tryAcquireNanos_flow.png

    共享式同步状态获取与释放

    共享锁顾名思义就是可以多个线程共用一个锁,在同步器中使用 acquireShared 来获取共享锁(同步状态),方法源码如下:

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
    

    首先通过 tryAcquireShared 尝试获取共享锁,该方法是一个模板方法在同步器中只是抛出一个不支持操作异常,需要开发人员自己去实现,同时方法的返回值有三种不同的类型分别代表三种不同的状态,其含义如下:

    1. 小于 0 表示当前线程获取锁失败
    2. 等于 0 表示当前线程获取锁成功,但是之后的线程在没有锁释放的情况下获取锁将失败,也就是说这个锁是共享模式下的最后一把锁了
    3. 大于 0 表示当前线程获取锁成功,并且还有剩余的锁可以获取

    当方法 tryAcquireShared 返回值小于 0 时,也就是获取锁失败,将会执行方法 doAcquireShared,继续跟进该方法:

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    方法首先调用 addWaiter 方法封装当前线程和等待状态为共享模块的节点并将其添加到等待同步队列中,可以发现在共享模式下节点的 nextWaiter 属性是固定值 Node.SHARED。然后循环获取当前节点的前驱节点,如果前驱节点是头节点的话就尝试获取共享锁,如果返回值大于等于 0 表示获取共享锁成功,则调用 setHeadAndPropagate 方法,更新头节点同时如果有可用资源,则向后传播,唤醒后继节点,接下来会检查一下中断标识,如果已经中断则中断当前线程,方法结束返回。如果返回值小于 0,则表示获取锁失败,需要挂起阻塞当前线程或者继续自旋获取共享锁。下面看看 setHeadAndPropagate 方法的具体实现:

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        /*
            * Try to signal next queued node if:
            *   Propagation was indicated by caller,
            *     or was recorded (as h.waitStatus either before
            *     or after setHead) by a previous operation
            *     (note: this uses sign-check of waitStatus because
            *      PROPAGATE status may transition to SIGNAL.)
            * and
            *   The next node is waiting in shared mode,
            *     or we don't know, because it appears null
            *
            * The conservatism in both of these checks may cause
            * unnecessary wake-ups, but only when there are multiple
            * racing acquires/releases, so most need signals now or soon
            * anyway.
            */
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
    

    首先将当前获取到锁的节点设置为头节点,然后方法参数 propagate > 0 时表示之前 tryAcquireShared 方法的返回值大于 0,也就是说当前还有剩余的共享锁可以获取,则获取当前节点的后继节点并且后继节点是共享节点时唤醒节点去尝试获取锁,doReleaseShared 方法是同步器共享锁释放的主要逻辑。


    同步器提供了 releaseShared 方法来进行共享锁的释放,方法源码如下所示:

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
    

    首先调用 tryReleaseShared 方法尝试释放共享锁,方法返回 false 代表锁释放失败,方法结束返回 false,否则就表示成功释放锁,然后执行 doReleaseShared 方法,进行唤醒后继节点并检查它是否可以向后传播等操作。继续跟进该方法如下:

    private void doReleaseShared() {
            /*
            * Ensure that a release propagates, even if there are other
            * in-progress acquires/releases.  This proceeds in the usual
            * way of trying to unparkSuccessor of head if it needs
            * signal. But if it does not, status is set to PROPAGATE to
            * ensure that upon release, propagation continues.
            * Additionally, we must loop in case a new node is added
            * while we are doing this. Also, unlike other uses of
            * unparkSuccessor, we need to know if CAS to reset status
            * fails, if so rechecking.
            */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                            !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
    

    可以看到和独占式锁释放不同的是,在共享模式下,状态同步和释放可以同时执行,其原子性由 CAS 来保证,如果头节点改变了也会继续循环。每次共享节点在共享模式下唤醒时,头节点都会指向它,这样就可以保证可以获取到共享锁的所有后续节点都可以唤醒了。

    如何自定义同步组件

    JDK 中基于同步器实现的一些类绝大部分都是聚合了一个或多个继承了同步器的类,使用同步器提供的模板方法自定义内部同步状态的管理,然后通过这个内部类去实现同步状态管理的功能,其实这从某种程度上来说使用了 模板模式。比如 JDK 中可重入锁 ReentrantLock、读写锁 ReentrantReadWriteLock、信号量 Semaphore 以及同步工具类 CountDownLatch 等,其源码部分截图如下:

    AQS_use_in_jdk_examples.png

    通过上文可以知道,我们基于同步器可以分别自定义独占锁同步组件和共享锁同步组件,下面以实现一个在同一个时刻最多只允许 3 个线程访问,其它线程的访问将被阻塞的同步工具 TripletsLock 为例,很显然这个工具是共享锁模式,主要思路就是去实现一个 JDk 中的 Lock 接口来提供面向使用者的方法,比如,调用 lock 方法获取锁,使用 unlock 来对锁进行释放等,在 TripletsLock 类内部有一个自定义同步器 Sync 继承自同步器 AQS,用来对线程的访问和同步状态进行控制,当线程调用 lock 方法获取锁时,自定义同步器 Sync 先计算出获取到锁后的同步状态,然后使用 Unsafe 类操作来保证同步状态更新的原子性,由于同一时刻只能 3 个线程访问,这里我们可以将同步状态 state 的初始值设置为 3,表示当前可用的同步资源数量,当有线程成功获取到锁时将同步状态 state 减 1,有线程成功释放锁时将同步状态加 1,同步状态的取值范围为 0、1、2、3,同步状态为 0 时表示没有可用同步资源,这个时候如果有线程访问将被阻塞。下面来看看这个自定义同步组件的实现代码:

    /**
     * @author mghio
     * @date: 2020-06-13
     * @version: 1.0
     * @description:
     * @since JDK 1.8
     */
    public class TripletsLock implements Lock {
    
      private final Sync sync = new Sync(3);
    
      private static final class Sync extends AbstractQueuedSynchronizer {
        public Sync(int state) {
          setState(state);
        }
    
        Condition newCondition() {
          return new ConditionObject();
        }
    
        @Override
        protected int tryAcquireShared(int reduceCount) {
          for (; ;) {
            int currentState = getState();
            int newState = currentState - reduceCount;
            if (newState < 0 || compareAndSetState(currentState, newState)) {
              return newState;
            }
          }
        }
    
        @Override
        protected boolean tryReleaseShared(int count) {
          for (; ;) {
            int currentState = getState();
            int newState = currentState + count;
            if (compareAndSetState(currentState, newState)) {
              return true;
            }
          }
        }
      }
    
      @Override
      public void lock() {
        sync.acquireShared(1);
      }
    
      @Override
      public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
      }
    
      @Override
      public boolean tryLock() {
        return sync.tryAcquireShared(1) > 0;
      }
    
      @Override
      public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
      }
    
      @Override
      public void unlock() {
        sync.releaseShared(1);
      }
    
      @Override
      public Condition newCondition() {
        return sync.newCondition();
      }
    }
    

    下面启动 20 个线程测试看看自定义同步同步工具类 TripletsLock 是否达到我们的预期。测试代码如下:

    /**
     * @author mghio
     * @date: 2020-06-13
     * @version: 1.0
     * @description:
     * @since JDK 1.8
     */
    public class TripletsLockTest {
      private final Lock lock = new TripletsLock();
      private final DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    
      @Test
      public void testTripletsLock() {
        // 启动 20 个线程
        for (int i = 0; i < 20; i++) {
          Thread worker = new Runner();
          worker.setDaemon(true);
          worker.start();
        }
    
        for (int i = 0; i < 20; i++) {
          second(2);
          System.out.println();
        }
      }
    
      private class Runner extends Thread {
        @Override
        public void run() {
          for (; ;) {
            lock.lock();
            try {
              second(1);
              System.out.println(dateFormat.format(new Date()) + " ----> " + Thread.currentThread().getName());
              second(1);
            } finally {
              lock.unlock();
            }
          }
        }
      }
    
      private static void second(long seconds) {
        try {
          TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
      }
    }
    

    测试结果如下:

    AQS_TripletsLock_Test_Result.png

    从以上测试结果可以发现,同一时刻只有三个线程可以获取到锁,符合预期,这里需要明确的是这个锁获取过程是非公平的。

    总结

    本文主要是对同步器中的基础数据结构、独占式与共享式同步状态获取与释放过程做了简要分析,由于水平有限如有错误之处还请留言讨论。队列同步器 AbstractQueuedSynchronizerJDK 中很多的一些多线程并发工具类的实现基础框架,对其深入学习理解有助于我们更好的去使用其特性和相关工具类。


    参考文章

    Java并发编程的艺术
    Java Synchronizer - AQS Learning
    从 ReentrantLock 的实现看 AQS 的原理及应用
    The java.util.concurrent Synchronizer Framework

    相关文章

      网友评论

          本文标题:Java 中队列同步器 AQS(AbstractQueuedSy

          本文链接:https://www.haomeiwen.com/subject/zfsptktx.html