美文网首页
Java 源码分析-AbstractQueuedSynchron

Java 源码分析-AbstractQueuedSynchron

作者: 琼珶和予 | 来源:发表于2018-05-09 19:05 被阅读0次

  之前在看ReentrantLock的源码时,发现它的内部使用AbstractQueuedSynchronizer类实现同步,但是当时对AbstractQueuedSynchronizer理解不了,主要是单纯的看源代码容器懵逼。最近在《Java 并发编程的艺术》书,书上对AbstractQueuedSynchronizer做了详细的介绍,从而帮助自己理解了这个类。今天,在这里将我对AbstractQueuedSynchronizer的理解记录下来。
  本文主要参考资料:

  1.方腾飞、魏鹏、程晓明的《Java 并发编程的艺术》

1.AbstractQueuedSynchronizer的介绍

  队列同步器AbstractQueuedSynchronizer,是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量state来表示同步状态,通过内置的队列来完成线程的排队工作。
  同步器的主要使用方式是继承,子类通过继承同步器并且实现它的抽象方法来管理同步状态。同自定义组件里面,子类推荐被定义为静态内部类,这一点我们可以从ReentrantLock里面可以看出来。
  简而言之,如果想要定义一个类似于ReentrantLock的锁,在锁的内部,我们可以聚合AbstractQueuedSynchronizer,关于锁的获取锁和释放锁,都由AbstractQueuedSynchronizer来真正操作。可以这样理解二者的关系:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式。

2.使用AbstractQueuedSynchronizer定义一个简易的锁

public class Mutex implements Lock {

    private static class Sync extends AbstractQueuedSynchronizer{
        @Override
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        @Override
        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int arg) {
            if(getState() == 0){
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        public Condition newCondition(){
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}

  上面就是我们使用AbstractQueuedSynchronizer来实现的一个同步组件。首先,我们会发现,在这个自定义的同步组件中,定义了一个静态内部类Sync,继承于AbstractQueuedSynchronizer,然后在里面实现了一些方法。在这里实现的方法中,是真正操作state字段。前面也说了,在AbstractQueuedSynchronizer的内部使用了一个state来表示同步状态。
  同时我们还发现Mutex实现了Lock接口,实现了Lock接口里面的方法。这些方法相当于是对外公开的,面向使用者的。当程序员使用这个自定义同步组件时,通过调用这个方法来达到线程同步的目的。

3.AbstractQueuedSynchronizer的分析

  接下来我们将分析AbstractQueuedSynchronizer是如何完成线程同步的,其中包括:同步队列、独占式同步状态的获取和释放、共享式同步状态的获取和释放以及超时获取同步状态等方面。

(1).Node

  在说同步队列之前,我们先说一下Node这个类,因为在同步队列里面,存储每个元素就是Node元素,所以对Node这个类必须有一定的了解。
  首先,我们来看看Node节点有哪些的属性。

        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;

        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

  我们从Node的成员变量里面,我们可以看到同步队列是一个双向队列。其中next表示队列的下一个节点,而prev表示队列的上一个节点。
  waitStatus字段表示的是当前的Node的状态,这个包含四种状态,分别是:CANCELLED,SIGNAL,CONDITION,PROPAGATE,INITIAL。详细解释如下:

状态 描述
CANCELLED 值为1,由于同步队列中等待的线程等待超时或者是被中断,需要从同步队列中取消等待,节点进入该状态不在变换。
SIGNAL 值为-1,后继节点处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。
CONDITION 值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal之后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取。
PROPAGATE 值为-3,表示下一次共享式同步状态的获取将会无条件的传播下去。
INITIAL 值为0,初始状态。

  Node内部封装的信息差不多就是这些,现在我们已经对Node已经有了一个大概的了解,现在准备讲解一下同步队列了。

(2).同步队列

  我们知道,AbstractQueuedSynchronizer内部实现了一个同步队列来完成线程同步的排队。从AbstractQueuedSynchronizer的源码,我们可以看出,AbstractQueuedSynchronizer内部有两个成员变量用来这个队列的队头和队尾。
  AbstractQueuedSynchronizer的tail属性表示这个同步队列的队尾,head表示这个同步队列的队头。然后,我们可以根据Node的信息绘制出同步队列的结构图:



  AbstractQueuedSynchronizer内部的两个节点引用,一个指向头结点,一个指向尾节点。当一个线程获取同步状态,其他线程将无法获取到同步状态,转而被封装成为一个节点并且加入到同步队列中。在加入这个队列时,必须保证线程安全,因为获取不到同步状态的线程肯定有多个,所以AbstractQueuedSynchronizer提供了一个基于CAS方式设置节点的方法:compareAndSetTail(Node expect, Node update),它需要传递当前线程“认为”的的尾节点和当前节点,只有设置成功后,当前这个线程所在的这个节点才算是成功加入到同步队列里面去。
  而设置头结点则不需要保证线程安全,因为只有一个线程在操作setHead方法。为什么只有一个线程在设置头结点呢?首先,我们得理解一个东西,在同步队列里面头结点表示成功获取同步状态的线程所在的节点;在设置新的头结点时,表示当前有一个线程已经成功获取到了同步状态了,其中都被阻塞了,所以只有一个线程在设置头结点,不需要保证线程安全。

(3).独占式同步状态的获取和释放

  通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是说由于线程获取同步状态失败后进入了同步队列中,后续对这个线程进行中断操作时,该线程不会从同步队列中移出。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

  上面的acquire方法里面主要进行三步操作:
  1.调用tryAcquire方法来尝试获取同步状态。
  2.调用addWaiter方法将该节点添加到同步队列的尾部。
  3.调用acquireQueued方法使得该节点以“死循环”的方式来获取同步状态。
  其中,调用的tryAcquire方法实际上是我们在Sync里面重写的tryAcquire方法,如下:

        protected boolean tryAcquire(int arg) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

  在tryAcquire方法里面先尝试将state更新为1,如果更新失败的话,返回false;反之,则表示当前的线程获取同步状态,即获到了锁。
  在acquire方法里面,我们可以看到如果尝试获取同步状态失败的话,会调用addWaiter方法,我们来看看addWaiter方法究竟为我们做了什么。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        //尝试快速的将当前的Node添加到同步队列中
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        //如果上面的操作失败了,将会采取无限循环的操作来保证肯定添加成功
        enq(node);
        return node;
    }

  在addWaiter方法里面,我们知道,首先会尝试快速的将Node添加同步队列中去;如果快速添加失败的话,会调用enq方法来去保证肯定添加成功。
  当成功将Node添加到同步队列中去之后,会调用acquireQueued方法去不断的尝试获取同步状态。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  在acuqireQueued方法里面,当前线程在”死循环“中不断的尝试获取同步状态,并且只有前驱节点为头结点才能够尝试获取同步状态。也就是说,头结点释放了同步状态,首先准备获取同步状态的是头结点的后继节点。
  如果一个节点获取同步状态失败或者不符合获取状态的条件,就会进入parkAndCheckInterrupt方法里面进行进行等待。
  当一个线程执行完毕自己的操作之后,需要释放同步状态,会调用了release方法来释放同步状态。

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

  release方法的基本步骤也是非常的简单,先是tryRelease方法去尝试释放同步状态,如果释放的成功的话,会调用unparkSuccessor方法去唤醒头结点的后继节点。

(4).共享式同步状态的获取和释放

  说完了独占式同步状态的获取和释放,相信大家对它应该有了一个大概的理解。现在我们对共享式同步状态的获取和释放进行简单的分析。
  我们先举一个简单的例子,什么是共享式。假设有一个文件,一个进程在对这个这个文件进行读取操作的,那么其他进程只能对这个文件进行读取操作,不能进行写操作,这个就是共享式;如果这个文件被进行一个进程进行写操作,那么其他的所有的操作都会被拒绝,这个写操作就是独占式操作。
  之前,我们在分析Node的成员变量时,SHARED变量就是表示当前的Node获取共享式同步状态,EXCLUSIVE表示获取独占式同步状态。
  现在,我们来看看获取共享式同步状态的过程。

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

  我们通过调用acquireShared来获取一个共享式的同步状态,在acquireShared方法里面,调用了tryAcquireShared去尝试获取共享式的同步状态,当返回值大于等于0时,表示能够获取到同步状态;反之则不能获取同步状态。
  当获取同步状态失败,会调用doAcquireShared方法,我们来在这个方法里面都在干嘛。

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  跟独占式差不多,当获取共享式的同步状态失败时,会在doAcquireShared方法里面进行自旋,如果当前节点的前驱节点为头条节点时,尝试获取同步状态,如果返回值为大于等于0时,表示此次获取同步状态成功并且从自旋状态中退出。
  我们也来看看释放共享式同步状态的操作。

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

  在releaseShared方法里面首先尝试释放同步状态,如果释放成功的话,调用doReleaseShared方法对后续线程进行唤醒操作。
  它与独占式的区别是,在tryReleaseShared方法里面必须保证同步状态被安全释放,一般是通过循环和CAS来保证的,因为使用同步状态的操作可能来自多个线程。

(5).独占式超时获取同步状态

  通过调用doAcquireNanos(int arg, long nanosTimeout)方法可以超时获取同步状态,即在指定的时间内获取同步状态,如果获取成功的话则返回true;反之,则返回false。该方法提供了传统Java同步操作(比如synchronized关键字)所不具备的特性。
  在Java 5之前,当一个线程获取不到锁而被阻塞在synchronized那里,对该线程进行中断操作,此时该线程的中断标志位会被修改,但是线程依旧会阻塞在synchronized上,等待着获取锁。在Java5中,AbstractQueuedSynchronizer提供了acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前的线程被中断了,会立刻返回,并且抛出InterruptedException。
  超时获取同步状态过程可以被视作响应中断获取同步状态过程的”增强版“,doAcquireNanos方法在支持响应中断的基础上,增加了超时获取的特性。

    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

  对于超时获取同步状态,关键在于睡眠时间的计算。为了得到及时的响应,nanosTimeOut = deadline - System.nanoTime(),如果nanosTimeOut小于等于0时,表示时间已经到了;反之则表示时间还未到,需要睡眠nanosTimeOut。
  在这种情况下获取同步状态跟独占式情况下获取比较类似,只不过这里需要注意几个地方:
  1.当nanosTimeout > spinForTimeoutThreshold时才进行睡眠,因为非常短的超时等待无法做到十分精确,如果此时还要进行超时等待,会让nanosTimeout从整体上表现的反而不精确。
  2.如果当前的线程被中断过,就会抛出InterruptedException异常。

4.总结

   AbstractQueuedSynchronizer的实现原理最重要的一部分就是同步队列,不管是在获取同步状态还是释放状态,最终还是体现在对同步队列的维护。
   说实话,在这之前,我也看过AbstractQueuedSynchronizer类的源代码,但是当时是狗咬乌龟--无从下口。现在借助《Java 并发编程的艺术》对AbstractQueuedSynchronizer有了一定的了解,所以强烈推荐这本书!!!

相关文章

网友评论

      本文标题:Java 源码分析-AbstractQueuedSynchron

      本文链接:https://www.haomeiwen.com/subject/ufcirftx.html