美文网首页
2. AbstractQueuedSynchronizer

2. AbstractQueuedSynchronizer

作者: shallowinggg | 来源:发表于2019-03-15 23:10 被阅读0次

队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁和其他同步组件的基础框架。它使用了一个int成员变量来表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3个方法(getState()setState(int)compareAndSetState(int,int))来进行操作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,同步器本身没有实现任何同步接口,它仅仅定义了若干同步状态获取和释放的方法供自定义同步组件使用,同步器既支持独占式获取同步状态,也支持共享式获取同步状态。当以独占式模式获取时,其他线程尝试获取会失败,而以共享式模式获取时,多个线程获取则可能会成功。

同步器是实现锁(也可以是任何同步组件)的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器是面向锁的实现者的,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。

使用

为了使用此类作为同步组件的基础,需要重新定义以下方法,通过使用getState()setState(int)compareAndSetState(int,int)方法来检查和修改同步状态:

方法名称 描述
tryAcquire(int) 独占式获取同步状态,实现该方法需要查询当前状态把那个且判断同步状态是否符合预期,然后再进行CAS设置同步状态
tryRelease(int) 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
tryAcquireShared(int) 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之,获取失败
tryReleaseShared(int) 共享式释放同步状态
isHeldExclusively() 同步器是否被在独占模式下被线程占用

每个方法的默认实现是抛出UnsupportedOperationException异常。这个方法的实现必须是线程安全的,简短的并且非阻塞的。定义这些方法是此类唯一支持的操作,其他方法被声明为final因为它们不能被独立改变。

    > line: 1115
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }

    protected boolean tryReleaseShared(int arg) {
        throw new UnsupportedOperationException();
    }

    protected boolean isHeldExclusively() {
        throw new UnsupportedOperationException();
    }

你可能也发现了从AbstractOwnableSynchronizer类中继承的方法,这些方法对于追踪线程获取独占式同步器很有用。你应该去使用它们,这确保了监视器和错误处理组件能帮助用户确定哪个线程持有了锁。

public abstract class AbstractOwnableSynchronizer
    implements java.io.Serializable {

    /** Use serial ID even though all fields transient. */
    private static final long serialVersionUID = 3737899427754241961L;

    protected AbstractOwnableSynchronizer() { }

    /**
     * 独占模式同步器的持有线程
     */
    private transient Thread exclusiveOwnerThread;

    /**
     * 设置同步器的拥有者。如果传入的参数为null,表示当前没有线程占有同步器
     * 这个方法没有强加任何同步措施,例如synchronized或者volatile访问
     */
    protected final void setExclusiveOwnerThread(Thread thread) {
        exclusiveOwnerThread = thread;
    }

    /**
     * 返回最后一次使用setExclusiveOwnerThread设置的线程,如果没有设置过,
     * 则返回null。这个方法没有强加任何同步措施,例如synchronized或者volatile访问
     */
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

这个类基于内部的FIFO队列,独占式同步器的核心如下:

Acquire:
    while (!tryAcquire(arg)) {
        如果还未入队,那么将此线程入队;
        可能会将当前线程阻塞;
    }

Release:
    if (tryRelease(arg))
        唤醒第一个等待线程;

共享模式与此相似。

示例

这里是一个不可重入的独占锁类,使用0代表解锁状态,1代表加锁状态。虽然一个不可重入锁不强制要求记录当前持有同步器的线程,但是这个类这样做的原因是使得使用者更容易监视。

class Mutex implements Lock, java.io.Serializable {

    // Our internal helper class
    private static class Sync extends AbstractQueuedSynchronizer {
        // Acquires the lock if state is zero
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // Releases the lock by setting state to zero
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // Reports whether in locked state
        public boolean isLocked() {
            return getState() != 0;
        }

        public boolean isHeldExclusively() {
            // a data race, but safe due to out-of-thin-air guarantees
            return getExclusiveOwnerThread() == Thread.currentThread();
        }

        // Provides a Condition
        public Condition newCondition() {
            return new ConditionObject();
        }

        // Deserializes properly
        private void readObject(ObjectInputStream s)
                throws IOException, ClassNotFoundException {
            s.defaultReadObject();
            setState(0); // reset to unlocked state
        }
    }

    // The sync object does all the hard work. We just forward to it.
    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isLocked();
    }

    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

这里是一个类似java.util.concurrent.CountDownLatch的栅栏类,除了它只需要释放一个信号。因为栅栏是非独占式的,它将使用共享式获取和释放方法。

class BooleanLatch {

    private static class Sync extends AbstractQueuedSynchronizer {
        boolean isSignalled() {
            return getState() != 0;
        }

        protected int tryAcquireShared(int ignore) {
            return isSignalled() ? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignore) {
            setState(1);
            return true;
        }
    }

    private final Sync sync = new Sync();

    public boolean isSignalled() {
        return sync.isSignalled();
    }

    public void signal() {
        sync.releaseShared(1);
    }

    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
}

从上面的实现类中可以看出使用AQS框架能够大大降低一个可靠的自定义同步器的实现门槛。

AQS实现分析

接下来从实现角度分析同步器是如何完成线程同步状态的管理,主要包括:同步队列、独占式同步状态获取与释放、共享式同步状态获取与释放以及超时获取同步状态等同步器的核心数据结构和模板方法。

同步队列

同步器依赖内部的同步队列来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待信息等状态狗造成一个节点并将其增加到同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步队列中的节点用来保存获取同步状态失败的线程引用、等待状态以及前驱后继结点,节点的属性如下所示:

    static final class Node {
        /** 标记节点在共享式模式下等待 */
        static final Node SHARED = new Node();
        /** 标记节点在独占式模式下等待 */
        static final Node EXCLUSIVE = null;

        /** waitStatus value 表示线程已经被取消获取共享状态 */
        static final int CANCELLED =  1;
        /** waitStatus value 表示后继节点需要被唤醒 */
        static final int SIGNAL    = -1;
        /** waitStatus value 表示节点在condition上等待 */
        static final int CONDITION = -2;
        /**
         * waitStatus value 表示下一个共享式节点需要被无条件传递
         */
        static final int PROPAGATE = -3;

        /**
         * 状态域,只能有以下几种值:
         *   SIGNAL:     这个节点的后续节点(或者即将成为后继节点)被阻塞,
         *               所以当前节点必须在它释放共享状态或取消获取共享状态时唤醒它的后继节点。
         *               为了避免竞争,acquire方法必须首先表示它们需要一个信号,然后重新进行原子获取,
         *               如果失败,则被阻塞

         *   CANCELLED:  这个节点由于超时或者中断而被取消。节点不会离开这个状态。
         *               特别的,有cancelled节点的线程将不会再次被阻塞。
         *   CONDITION:  这个节点现在处于condition等待队列中。
         *               它不会作为同步队列节点使用直到它的状态被设置为0并且被转移到同步队列中
         *              (使用0这个值与这个域的其他使用没有任何关系,简化了机制)。
         *   PROPAGATE:  一次共享式同步状态释放应当被传递给其他节点。
         *               这只会在doReleaseShared()方法中被设置来确保传递进行。
         *   0:          初始值
         *
         * 这个值被安排为数字主要是简化使用。肺腑值意味着节点不需要被通知。
         * 因此,大多数代码不需要只为了通知而去检查特定的值。
         *
         * 此成员变量对于普通同步节点初始化为0,对于condition节点初始化为CONDITION。
         * 它使用CAS操作进行修改,或者如果可能的话,使用volatile写。
         */
        volatile int waitStatus;

        /**
         * 前驱结点。当节点入队时被设置,出队时被设为null以帮助 gc。
         * 当然,当前驱结点被取消,我们快速循环搜索整个队列来寻找
         * 一个未被取消的节点,它总是会存在因为头节点不会被取消:
         * 一个节点成为头节点只会因为成功获取到了同步状态。一个被取消
         * 的线程将不会再继续及进行获取同步状态,并且它只会取消自己的节点
         */
        volatile Node prev;

        /**
         * 后继节点。当节点入队时被设置,删除被取消的节点时被修改,出队
         * 时被设置为null以帮助 gc。入队操作不会设置前驱结点的next域直到
         * 入队成功,所以发现next域的值为null不一定代表此节点为最后一个
         * 节点。被取消的节点的next域被设置为指向它自己。
         */
        volatile Node next;

        /**
         * 插入此节点的线程。在构造函数中初始化,使用完后设置为null。
         */
        volatile Thread thread;

        /**
         * 等待队列中的后继节点。如果当前节点是共享的,那么这个字段
         * 将是一个SHARED常量。因为等待队列只有当处于独占模式时才能
         * 访问,所以我们只需要一个简单的链接队列来保存节点,当它们在
         * condition上等待时。它们再次尝试获取同步状态会被转移到同步队列。
         * 因为condition只可以是独占式的,所以我们使用一个特殊值来表示
         * 共享模式。
         */
        Node nextWaiter;

        /**
         * 如果这个节点在共享模式等待,则返回true。
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 返回前驱结点,如果为null则抛出 NullPointerException 异常。
         * 当前驱结点不为null时使用。null检查可以去掉,但是存在可以帮助VM。 
         * @return the predecessor of this node
         */
        final Node predecessor() {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        /** Establishes initial head or SHARED marker. */
        Node() {}

        /** Constructor used by addWaiter. */
        Node(Node nextWaiter) {
            this.nextWaiter = nextWaiter;
            THREAD.set(this, Thread.currentThread());
        }

        /** Constructor used by addConditionWaiter. */
        Node(int waitStatus) {
            WAITSTATUS.set(this, waitStatus);
            THREAD.set(this, Thread.currentThread());
        }

        /** CASes waitStatus field. */
        final boolean compareAndSetWaitStatus(int expect, int update) {
            return WAITSTATUS.compareAndSet(this, expect, update);
        }

        /** CASes next field. */
        final boolean compareAndSetNext(Node expect, Node update) {
            return NEXT.compareAndSet(this, expect, update);
        }

        final void setPrevRelaxed(Node p) {
            PREV.set(this, p);
        }

        // VarHandle 机制。将上面字段使用VarHandle封装,只是为了提高代码性能。      
        // 这是jdk提供的一种底层操作机制,java8前为Unsfe类。
        private static final VarHandle NEXT;
        private static final VarHandle PREV;
        private static final VarHandle THREAD;
        private static final VarHandle WAITSTATUS;
        static {
            try {
                MethodHandles.Lookup l = MethodHandles.lookup();
                NEXT = l.findVarHandle(Node.class, "next", Node.class);
                PREV = l.findVarHandle(Node.class, "prev", Node.class);
                THREAD = l.findVarHandle(Node.class, "thread", Thread.class);
                WAITSTATUS = l.findVarHandle(Node.class, "waitStatus", int.class);
            } catch (ReflectiveOperationException e) {
                throw new Error(e);
            }
        }
    }

节点是构成同步队列的基础,同步器拥有首节点head和尾节点tail,没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的基本结构如下:

    > line: 563      处于源代码中的位置
    /**
     * 同步队列的头节点,延迟初始化。除了初始化以外,只能通过setHead方法
     * 来修改。注意:如果头节点存在,它的waitStatus被保证不会是CANCELLED 
     */
    private transient volatile Node head;

    /**
     * 同步队列的尾节点,延迟初始化。只会通过enq方法增加新节点才会被改变。 
     */
    private transient volatile Node tail;

    /**
     * 同步状态
     */
    private volatile int state;
    
    > line: 2293
    // VarHandle 机制。将上面三个字段使用VarHandle封装,只是为了提高代码性能。      
    private static final VarHandle STATE;
    private static final VarHandle HEAD;
    private static final VarHandle TAIL;

    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            STATE = l.findVarHandle(AbstractQueuedSynchronizer.class, "state", int.class);
            HEAD = l.findVarHandle(AbstractQueuedSynchronizer.class, "head", Node.class);
            TAIL = l.findVarHandle(AbstractQueuedSynchronizer.class, "tail", Node.class);
        } catch (ReflectiveOperationException e) {
            throw new Error(e);
        }

        // Reduce the risk of rare disastrous classloading in first call to
        // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
        Class<?> ensureLoaded = LockSupport.class;
    }

试想一下,当一个线程成功获取了同步状态,其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证是线程安全的,因为同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect, Node update),它需要传入一个当前线程“认为”的尾节点和当前节点,然后进行CAS更新。

    > line: 2325
    /**
     * CASes tail field.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return TAIL.compareAndSet(this, expect, update);
    }

同步器将节点加入到同步队列的过程如下:

同步队列遵循FIFO,首节点是获取同步状态的节点,首节点的线程在释放同步状态时,会唤醒后继节点,而后继节点会在成功获取同步状态时将自己设为头节点:

设置头节点是通过获取同步状态成功的节点来完成的,由于只有一个线程能够获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证。

    > line: 674
    /**
     * Sets head of queue to be node, thus dequeuing. Called only by
     * acquire methods.  Also nulls out unused fields for sake of GC
     * and to suppress unnecessary signals and traversals.
     *
     * @param node the node
     */
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

独占式同步状态的获取与释放

通过调用同步器的acquire(int)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续如果对线程进行中断操作,线程不会从同步队列中移除。

> line: 1236
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果获取同步状态的过程中被中断了,那么acquireQueued返回后就将自己中断
        selfInterrupt();
}

> line: 873
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用tryAcquire(int)尝试获得锁,如果获取锁失败,构造一个独占型结点并增加到队尾。
然后循环CAS获取锁,如果前驱结点为head并且此时tryAcquire(int)成功,获取到锁,设置头节点为此节点。如果失败,调用shouldParkAfterFailedAcquire()方法判断,如果返回true则阻塞当前线程,否则循环再次获取。
note: 前驱结点waitStatus为SIGNAL返回true,为其他则CAS设置为SIGNAL并返回false,即第二次调用一般返回true,即每个线程被构造结点后有两次机会尝试获取锁,失败则被阻塞。如果发生了异常,则取消此节点。

  1. 首先调用tryAcquire(int)尝试获得锁,如果获取锁失败,构造一个独占型结点并增加到队尾
> line: 650
private Node addWaiter(Node mode) {
    //以独占模式构造一个节点
    Node node = new Node(mode);

    for (;;) {
        Node oldTail = tail;
        //获取尾节点,如果为null则初始化同步队列(对应前面注释中的延迟初始化)
        if (oldTail != null) {
            //设置prev结点
            node.setPrevRelaxed(oldTail);
            //CAS设置尾节点
            if (compareAndSetTail(oldTail, node)) {
                oldTail.next = node;
                return node;
            }
        } else {
            initializeSyncQueue();
        }
    }
}

> line: 2316
private final void initializeSyncQueue() {
    Node h;
    //使用CAS设置头节点,此时head和tail指向同一节点
    if (HEAD.compareAndSet(this, null, (h = new Node())))
        tail = h;
}

> line: 561   Node#setPrevRelaxed(Node p)
final void setPrevRelaxed(Node p) {
        PREV.set(this, p);
}

上面使用compareAndSetTail(oldTail, node)方法来去报节点能够被线程安全添加。试想一下:如果使用一个普通的LinkedList来维护节点之间的关系,那么当一个线程获取了同步状态,而其他多个线程调用tryAcquire(int)获取同步状态失败而被并发添加到LinkedList中时,LinkedList将难以保证Node的正确添加,最终的结果可能时节点数量有偏差,而且顺序也是混乱的。

  1. 然后循环CAS获取锁,如果前驱结点为head并且此时tryAcquire(int)成功,获取到锁,设置头节点为此节点。如果失败,调用shouldParkAfterFailedAcquire()方法判断,如果返回true则阻塞当前线程,否则循环再次获取。
> line: 904
final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) {
            // 获取前驱结点
            final Node p = node.predecessor();
            // 如果前驱结点为头节点并且tryAcquire(int)成功
            if (p == head && tryAcquire(arg)) {
                // 将自己设置为头节点
                setHead(node);
                p.next = null; // help GC
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        //如果在获取同步状态时出现异常,则取消此节点
        cancelAcquire(node);
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

> line: 842
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 前驱结点已经被设置为SIGNAL,要求线程释放同步状态时通知此节点,
         * 所以它可以被安全的阻塞了。
         * 如果线程第一次获取同步状态失败,它前驱节点的waitStatus就会被设置为SIGNAL,
         * 如果第二次再次失败,它将会被阻塞。
         */
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 新构造的结点的waitStatus默认为0,将前驱节点的waitStatus CAS设置为SIGNAL。
         */
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

> line: 882
private final boolean parkAndCheckInterrupt() {
    // 阻塞当前线程
    LockSupport.park(this);
    // 返回线程的中断标志位
    return Thread.interrupted();
}

> line: 789  
private void cancelAcquire(Node node) {
    // 如果此节点不存在则忽略
    if (node == null)
        return;
    node.thread = null;

    // 跳过前面被取消的结点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext是前面第一个不需要删除的结点。下面的CAS如果失败了,
    // 也就是我们和其他的cancel或signal操作竞争时失败了,那么更后面
    // 的操作就没必要执行了。
    Node predNext = pred.next;

    // 将waitStatus设置为CANCELLED
    // 可以使用volatile写代替CAS。此步完成后,其他结点能够跳过此节点。
    // 在此之前不受其他线程的干扰。
    node.waitStatus = Node.CANCELLED;

    // 如果是尾节点,则移除自己
    if (node == tail && compareAndSetTail(node, pred)) {
        pred.compareAndSetNext(predNext, null);
    } else {
        // 如果它的后继节点需要唤醒,尝试将其设置为前驱节点的next,
        // 如此便可以唤醒它。否则,我们就直接唤醒它。
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && pred.compareAndSetWaitStatus(ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                pred.compareAndSetNext(predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

> 看完cancelAcquire的源码后,我们回头再看shouldParkAfterFailedAcquire剩余部分
> line: 842
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;

    // 如果前驱结点的waitStatus为CANCELLED,那么跳过前面连续的CANCELLED结点,
    // 寻找到一个waitStatus<=0的结点,并将其next指向此节点。
    // 因为cancelAcquire正常执行完CANCELLED结点的next会指向自己,当pred
    // 的next不再指向它时,将不再会有对象引用它,下一此gc时这些CACCELLED节点就会被回收
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
    }
    return false;
}

acquireQueued(final Node node, int arg)方法中,当前线程在循环中尝试获取同步状态,而只有前驱结点时头节点时才能尝试获取同步状态,这是为什么?

  1. 头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱结点是否为头节点。
  2. 维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态的行为如下:

由于非头节点线程的前驱节点出队或者被中断线程会从等待状态返回,随后检查自己的前驱节点是否为头节点,如果是则尝试获取同步状态。可以看到节点与节点之间在循环检查的过程中基本不相互通信,而是简单的判断自己的前驱是否为头节点,这就使得节点的释放规则符合FIFO,并且也便于对过早通知的处理(过早通知是指前驱节点不是头节点的线程被意外唤醒或者因为被中断而唤醒)。

acquire(int)方法调用流程如下所示:

可中断获取

> line: 1256
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    // 如果在尝试获取同步状态前便被中断了,那么立刻抛出InterruptedException
    // note:其他线程对其调用Thread#interrupt()方法,此线程还能继续运行,
    //       只是中断标志位被设置为true。
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

> line: 929
private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    //构造节点并增加到同步队列中
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return;
            }
            // 如果在获取同步状态的过程中被中断了,那么抛出InterruptedException
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

可以看出此方法和acquire(int)方法几乎一样,除了线程如果被中断会立刻抛出InterruptedException异常,而不是只被记录下来。

超时获取

> line: 1281
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

> line: 957
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 如果超时时间<=0,立刻返回
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                return true;
            }
            // 如果超时时间结束,那么就取消此节点获取同步状态并返回false
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            // 如果剩余时间大于1000ns,则线程睡眠
            // 否则,线程进入快速的自旋中,尝试获取同步状态,
            // 提高获取同步状态的可能。同时,1000ns以下的睡眠
            // 无法保证精准的执行。
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

static final long SPIN_FOR_TIMEOUT_THRESHOLD = 1000L;

public static void parkNanos(Object blocker, long nanos) {
    if (nanos > 0) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        U.park(false, nanos);
        setBlocker(t, null);
    }
}

可以看出此版本与acquire(int)也没有太大区别,只是增加了中断响应和超时控制。

释放同步状态

当前线程获取同步状态并执行了相应逻辑后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用relaase(int)方法可以释放同步状态,该方法释放了同步状态后,会唤醒其后继节点。

>line: 1299
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

> line: 685
private void unparkSuccessor(Node node) {
    /*
     * 如果waitStatus为负值(可能需要唤醒后继节点),尝试清除它。
     * 如果这个CAS失败了或者waitStatus被等待线程改变也可以接受。
     */
    int ws = node.waitStatus;
    if (ws < 0)
        node.compareAndSetWaitStatus(ws, 0);

    /*
     * 需要被唤醒的线程存储在后继节点中,一般都是下一个节点。
     * 但是如果下一个节点被取消了或者是null,那么从队列尾部开始
     * 遍历寻找没有被取消的后继节点。
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        // 作用是解除对下一个被取消节点的引用,使其能够被gc
        s = null;
        for (Node p = tail; p != node && p != null; p = p.prev)
            if (p.waitStatus <= 0)
                s = p;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

分析完独占式同步状态获取和释放过程后,做一个总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被增加到同步队列中并在队列中自旋(只有两次尝试的机会,都失败就会被阻塞,防止浪费CPU资源);移除队列的条件是前驱结点为头节点并且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int)方法释放同步状态,然后唤醒头节点的后继节点。

共享式同步状态获取与释放

共享式与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态,例如文件读写,可以有多个线程同时读文件,但是只能有一个线程写文件。

通过同步器的acquireShared(int)方法可以共享式获取同步状态:

> line: 1320
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

> line: 992
private void doAcquireShared(int arg) {
    // 增加一个SHARED节点
    final Node node = addWaiter(Node.SHARED);
    boolean interrupted = false;
    try {
        for (;;) {
            final Node p = node.predecessor();
            // 前驱结点为头节点时尝试获取同步状态
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 设置头节点并尝试唤醒后继节点
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node))
                interrupted |= parkAndCheckInterrupt();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    } finally {
        if (interrupted)
            selfInterrupt();
    }
}

> line: 755
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * 如果满足以下条件之一,尝试唤醒下一个节点:
     * 1. propagate>0,表示还有同步状态可供获取
     * 2. waitStatus<0(在setHead()方法调用前后被其他操作修改),
     *    表示唤醒后继节点。注意:PROPAGATE在shouldParkAfterFailedAcquire
     *     中可能会被更改为SIGNAL
     * 并且下一个节点在共享模式下等待,或者为null 。
     * 
     * 这些检查可能会导致不必要的线程唤醒,当时只有在多个线程正在
     * 竞争acquires/releases时会发生,所以大部分情况都需要立刻唤醒
     * 后面的共享节点。
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

> line: 717
private void doReleaseShared() {
    /*
     * 确保释放同步状态传递,即使有其他正在进行中的acquires/releases。
     * 如果waitStatus为SIGNAL使用unparkSuccessor(head)唤醒后继节点。
     * 但是如果不是,那么设置waitStatus为PROPAGATE来确保释放传递
     * 继续下去。另外,当我们这样做时必须循环进行防止一个新节点被增加。
     * 同时,不像unparkSuccessor在其他地方的使用,我们需要直到CAS是否
     * 失败,如果是那么进入下一个循环重新进行。
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            // 当前只有一个线程获取了同步状态,没有其他线程尝试获取,
            // 所以waitStatus为0,CAS修改为PROPAGATE确保传递继续,因为
            // 后续节点获取同步状态失败时会调用shouldParkAfterFailedAcquire
            // 方法将其设置为SIGNAL。
            else if (ws == 0 &&
                     !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

acquireShared(int)方法中,同步器调用tryAcquireShared(arg)方法尝试获取同步状态,tryAcquireShared(arg)方法返回值为int,当返回值大于等于0时,表示能够获取到同步状态。因此,在共享式获取的自旋过程中,成功获取同步状态并退出自旋的条件就是tryAcquireShared(arg)方法返回值大于等于0。

可中断获取

> line: 1338
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

> line: 1022
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

与独占式获取一样,可中断获取版本能够响应中断,当线程被中断后,立刻抛出InterruptedException异常。

超时获取

> line: 1362
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 ||
        doAcquireSharedNanos(arg, nanosTimeout);
}

> line: 1053
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L) {
                cancelAcquire(node);
                return false;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } catch (Throwable t) {
        cancelAcquire(node);
        throw t;
    }
}

超时获取和独占式超时获取也无差别,不再详细赘述。

超时获取同步状态的流程如下:

共享同步状态释放

> line: 1379
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

doReleaseShared(int)方法前面已经分析过。该方法释放同步状态后,将会唤醒处于等待状态的节点。

剩余的一些监视方法以及辅助方法不属于核心机制部分,不对其进行分析,读者有兴趣可自行查看源码,相信看懂上述部分后理解这些方法也不困难。同时,关于Condition的部分将在后面部分讲解,下面先编写一个自定义同步组件巩固上面的分析。

自定义组件 —— TwinsLock

在前面几节中,已经对同步器AQS进行了主要功能实现的分析,本节通过编写一个自定义同步组件来加深对同步器的理解。

设计一个同步工具:该工具在同一时刻,只允许最多两个线程同时访问,超时两个线程的访问将被阻塞。

首先,确定访问模式。TwinsLock能够同一时刻支持多个线程访问,显然是一个共享式访问,因此需要AQS提供的acquiredShared(int)方法等和shared相关的方法,这就要求TwinsLock必须重新tryAcquiredShared(int)tryReleaseShared(int)方法,这样才能保证同步器的共享式同步状态的获取和释放方法得以执行。

其次,定义资源数。TwinsLock同一时刻允许最多两个线程访问,表明同步资源数为2,这样可以设置初始状态为2,当一个线程进行获取时,status减1,线程释放,status加1,状态的合法范围是0、1、2。其中0表示当前已经有两个线程获取了同步资源,此时如果还有其他线程尝试获取同步状态,只能被阻塞。在同步状态改变时,需要使用compareAndSet(int, int)方法做原子性保障。

最后,组合自定义同步器。

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class TwinsLock implements Lock {
    private final Sync sync = new Sync(2);

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) >= 0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }

    private static class Sync extends AbstractQueuedSynchronizer {
        private volatile int count;

        Sync(int count) {
            if(count <= 0) {
                throw new IllegalArgumentException("count must large than 0");
            }
            this.count = count;
            setState(count);
        }

        @Override
        protected int tryAcquireShared(int arg) {
            for(;;) {
                int current = getState();
                int newCount = current - arg;
                if(newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
            for(;;) {
                int current = getState();
                int newCount = current + arg;
                if(newCount > count) {
                    throw new IllegalMonitorStateException("Unexpected unlock");
                }
                if(compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }
}

在上述示例中,TwinsLock实现了Lock接口,提供了面向使用者的接口,使用者调用lock()方法获取锁,随后调用unlock()方法释放锁。TwinsLock同时包含了一个自定义同步器Sync,该同步器面向线程访问和同步状态控制。

同步器作为一个桥梁,连接线程访问以及同步状态控制等底层技术与不同并发组件(如Lock、CountDownLatch等)的接口语义。

下面编写一个测试验证TwinsLock能否正常工作。

public class TwinsLockTest {
    @Test
    public void test() {
        final Lock lock = new TwinsLock();
        class Worker implements Runnable {
            @Override
            public void run() {
                lock.lock();
                try {
                    SleepUtils.sleep(1);
                    System.out.println(Thread.currentThread().getName());
                    SleepUtils.sleep(1);
                } finally {
                    lock.unlock();
                }
            }
        }
        for (int i = 0; i < 10; ++i) {
            Thread worker = new Thread(new Worker(), "Worker-" + i);
            worker.setDaemon(true);
            worker.start();
        }
        for (int i = 0; i < 10; ++i) {
            SleepUtils.sleep(1);
            System.out.println();
        }
    }
}

输出如下:

Worker-2
Worker-0

Worker-1
Worker-5

Worker-3
Worker-4

Worker-8
Worker-9

Worker-6
Worker-7

可以看出线程名称成对输出,也就是同一时刻最多只有两个线程能够获取到锁,TwinsLock可以按预期正常工作。

Condition 接口

任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object)上,主要包括wait()wait(long)notify()notifyAll()方法,这些方法与synchronized关键字配合,能够实现等待/通知机制。Condition接口也提供了类似Object的监视器方法,与Lock配合也可以实现等待/通知机制,但是这两者在使用方式以及功能特性上有所区别。

通过对比Object的监视器方法和Condition接口,可以更详细地了解Condition的特性。

对比项 Object Monitor Methods Conditon
前置条件 获取对象的锁 调用Lock.lock()获取锁
调用Lock.newCondtion()获取Condition对象
调用方式 直接调用,如object.wait() 直接调用,如condition.await()
等待队列个数 一个 多个
当前线程释放锁并进入等待状态 支持 支持
当前线程释放锁并进入等待状态
后响应中断
不支持 支持
当前线程释放锁并进入超时等待状态 支持 支持
当前线程释放锁并进入等待状态到将来某个时间 不支持 支持
唤醒等待队列的一个线程 支持 支持
唤醒等待队列的全部线程 支持 支持

相关文章

  • 2. AbstractQueuedSynchronizer

    队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁和其他同步组件的基础...

  • 2. AbstractQueuedSynchronizer(二)

    Conditon接口与示例 Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取...

  • AQS

    AbstractQueuedSynchronizer源码 AbstractQueuedSynchronizer(A...

  • 1.AbstractQueuedSynchronizer源码分析

    AbstractQueuedSynchronizer AbstractQueuedSynchronizer是什么,...

  • 9.J.U.C之AQS

    一、介绍AQS是J.U.C的核心,AQS是AbstractQueuedSynchronizer的简写。 2.利用一...

  • AbstractQueuedSynchronizer实现分析

    1.什么是AbstractQueuedSynchronizer?2.同步队列中的节点(Node)3.独占式同步状态...

  • AbstractQueuedSynchronizer

    理解多线程的并发锁,可结合多进程的分布式锁(如Zookeeper的互斥锁、读写锁的实现原理),本质是相通的 介绍 ...

  • AbstractQueuedSynchronizer

    AbstractQueuedSynchronizer ReentrantLock主要内部通过Sync来完成锁的实现...

  • AbstractQueuedSynchronizer

    AQS简介 JUC将AQS定位为一个模板方法,它是一个抽象类,不可被实例化,它的设计之处是为了让子类通过继承的方式...

  • AbstractQueuedSynchronizer

    提供了实现阻塞锁和相关同步器依靠先入先出(FIFO)等待队列(信号量,事件等)的框架. 此类设计对于大多数种类的依...

网友评论

      本文标题:2. AbstractQueuedSynchronizer

      本文链接:https://www.haomeiwen.com/subject/dlgupqtx.html