美文网首页
Java 源码分析-AbstractQueuedSynchron

Java 源码分析-AbstractQueuedSynchron

作者: 琼珶和予 | 来源:发表于2018-05-09 19:05 被阅读0次

      之前在看ReentrantLock的源码时,发现它的内部使用AbstractQueuedSynchronizer类实现同步,但是当时对AbstractQueuedSynchronizer理解不了,主要是单纯的看源代码容器懵逼。最近在《Java 并发编程的艺术》书,书上对AbstractQueuedSynchronizer做了详细的介绍,从而帮助自己理解了这个类。今天,在这里将我对AbstractQueuedSynchronizer的理解记录下来。
      本文主要参考资料:

      1.方腾飞、魏鹏、程晓明的《Java 并发编程的艺术》

    1.AbstractQueuedSynchronizer的介绍

      队列同步器AbstractQueuedSynchronizer,是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量state来表示同步状态,通过内置的队列来完成线程的排队工作。
      同步器的主要使用方式是继承,子类通过继承同步器并且实现它的抽象方法来管理同步状态。同自定义组件里面,子类推荐被定义为静态内部类,这一点我们可以从ReentrantLock里面可以看出来。
      简而言之,如果想要定义一个类似于ReentrantLock的锁,在锁的内部,我们可以聚合AbstractQueuedSynchronizer,关于锁的获取锁和释放锁,都由AbstractQueuedSynchronizer来真正操作。可以这样理解二者的关系:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现细节;同步器面向的是锁的实现者,它简化了锁的实现方式。

    2.使用AbstractQueuedSynchronizer定义一个简易的锁

    public class Mutex implements Lock {
    
        private static class Sync extends AbstractQueuedSynchronizer{
            @Override
            protected boolean isHeldExclusively() {
                return getState() == 1;
            }
    
            @Override
            protected boolean tryAcquire(int arg) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            @Override
            protected boolean tryRelease(int arg) {
                if(getState() == 0){
                    throw new IllegalMonitorStateException();
                }
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
            public Condition newCondition(){
                return new ConditionObject();
            }
        }
    
        private final Sync sync = new Sync();
    
        @Override
        public void lock() {
            sync.acquire(1);
        }
    
        @Override
        public void lockInterruptibly() throws InterruptedException {
            sync.acquireInterruptibly(1);
        }
    
        @Override
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        @Override
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return sync.tryAcquireNanos(1, unit.toNanos(time));
        }
    
        @Override
        public void unlock() {
            sync.release(1);
        }
    
        @Override
        public Condition newCondition() {
            return sync.newCondition();
        }
    }
    

      上面就是我们使用AbstractQueuedSynchronizer来实现的一个同步组件。首先,我们会发现,在这个自定义的同步组件中,定义了一个静态内部类Sync,继承于AbstractQueuedSynchronizer,然后在里面实现了一些方法。在这里实现的方法中,是真正操作state字段。前面也说了,在AbstractQueuedSynchronizer的内部使用了一个state来表示同步状态。
      同时我们还发现Mutex实现了Lock接口,实现了Lock接口里面的方法。这些方法相当于是对外公开的,面向使用者的。当程序员使用这个自定义同步组件时,通过调用这个方法来达到线程同步的目的。

    3.AbstractQueuedSynchronizer的分析

      接下来我们将分析AbstractQueuedSynchronizer是如何完成线程同步的,其中包括:同步队列、独占式同步状态的获取和释放、共享式同步状态的获取和释放以及超时获取同步状态等方面。

    (1).Node

      在说同步队列之前,我们先说一下Node这个类,因为在同步队列里面,存储每个元素就是Node元素,所以对Node这个类必须有一定的了解。
      首先,我们来看看Node节点有哪些的属性。

            static final Node SHARED = new Node();
            static final Node EXCLUSIVE = null;
    
            static final int CANCELLED =  1;
            static final int SIGNAL    = -1;
            static final int CONDITION = -2;
            static final int PROPAGATE = -3;
    
            volatile int waitStatus;
    
            volatile Node prev;
    
            volatile Node next;
    
            volatile Thread thread;
    
            Node nextWaiter;
    

      我们从Node的成员变量里面,我们可以看到同步队列是一个双向队列。其中next表示队列的下一个节点,而prev表示队列的上一个节点。
      waitStatus字段表示的是当前的Node的状态,这个包含四种状态,分别是:CANCELLED,SIGNAL,CONDITION,PROPAGATE,INITIAL。详细解释如下:

    状态 描述
    CANCELLED 值为1,由于同步队列中等待的线程等待超时或者是被中断,需要从同步队列中取消等待,节点进入该状态不在变换。
    SIGNAL 值为-1,后继节点处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。
    CONDITION 值为-2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal之后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取。
    PROPAGATE 值为-3,表示下一次共享式同步状态的获取将会无条件的传播下去。
    INITIAL 值为0,初始状态。

      Node内部封装的信息差不多就是这些,现在我们已经对Node已经有了一个大概的了解,现在准备讲解一下同步队列了。

    (2).同步队列

      我们知道,AbstractQueuedSynchronizer内部实现了一个同步队列来完成线程同步的排队。从AbstractQueuedSynchronizer的源码,我们可以看出,AbstractQueuedSynchronizer内部有两个成员变量用来这个队列的队头和队尾。
      AbstractQueuedSynchronizer的tail属性表示这个同步队列的队尾,head表示这个同步队列的队头。然后,我们可以根据Node的信息绘制出同步队列的结构图:



      AbstractQueuedSynchronizer内部的两个节点引用,一个指向头结点,一个指向尾节点。当一个线程获取同步状态,其他线程将无法获取到同步状态,转而被封装成为一个节点并且加入到同步队列中。在加入这个队列时,必须保证线程安全,因为获取不到同步状态的线程肯定有多个,所以AbstractQueuedSynchronizer提供了一个基于CAS方式设置节点的方法:compareAndSetTail(Node expect, Node update),它需要传递当前线程“认为”的的尾节点和当前节点,只有设置成功后,当前这个线程所在的这个节点才算是成功加入到同步队列里面去。
      而设置头结点则不需要保证线程安全,因为只有一个线程在操作setHead方法。为什么只有一个线程在设置头结点呢?首先,我们得理解一个东西,在同步队列里面头结点表示成功获取同步状态的线程所在的节点;在设置新的头结点时,表示当前有一个线程已经成功获取到了同步状态了,其中都被阻塞了,所以只有一个线程在设置头结点,不需要保证线程安全。

    (3).独占式同步状态的获取和释放

      通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是说由于线程获取同步状态失败后进入了同步队列中,后续对这个线程进行中断操作时,该线程不会从同步队列中移出。

        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

      上面的acquire方法里面主要进行三步操作:
      1.调用tryAcquire方法来尝试获取同步状态。
      2.调用addWaiter方法将该节点添加到同步队列的尾部。
      3.调用acquireQueued方法使得该节点以“死循环”的方式来获取同步状态。
      其中,调用的tryAcquire方法实际上是我们在Sync里面重写的tryAcquire方法,如下:

            protected boolean tryAcquire(int arg) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    

      在tryAcquire方法里面先尝试将state更新为1,如果更新失败的话,返回false;反之,则表示当前的线程获取同步状态,即获到了锁。
      在acquire方法里面,我们可以看到如果尝试获取同步状态失败的话,会调用addWaiter方法,我们来看看addWaiter方法究竟为我们做了什么。

        private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            //尝试快速的将当前的Node添加到同步队列中
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            //如果上面的操作失败了,将会采取无限循环的操作来保证肯定添加成功
            enq(node);
            return node;
        }
    

      在addWaiter方法里面,我们知道,首先会尝试快速的将Node添加同步队列中去;如果快速添加失败的话,会调用enq方法来去保证肯定添加成功。
      当成功将Node添加到同步队列中去之后,会调用acquireQueued方法去不断的尝试获取同步状态。

        final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

      在acuqireQueued方法里面,当前线程在”死循环“中不断的尝试获取同步状态,并且只有前驱节点为头结点才能够尝试获取同步状态。也就是说,头结点释放了同步状态,首先准备获取同步状态的是头结点的后继节点。
      如果一个节点获取同步状态失败或者不符合获取状态的条件,就会进入parkAndCheckInterrupt方法里面进行进行等待。
      当一个线程执行完毕自己的操作之后,需要释放同步状态,会调用了release方法来释放同步状态。

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

      release方法的基本步骤也是非常的简单,先是tryRelease方法去尝试释放同步状态,如果释放的成功的话,会调用unparkSuccessor方法去唤醒头结点的后继节点。

    (4).共享式同步状态的获取和释放

      说完了独占式同步状态的获取和释放,相信大家对它应该有了一个大概的理解。现在我们对共享式同步状态的获取和释放进行简单的分析。
      我们先举一个简单的例子,什么是共享式。假设有一个文件,一个进程在对这个这个文件进行读取操作的,那么其他进程只能对这个文件进行读取操作,不能进行写操作,这个就是共享式;如果这个文件被进行一个进程进行写操作,那么其他的所有的操作都会被拒绝,这个写操作就是独占式操作。
      之前,我们在分析Node的成员变量时,SHARED变量就是表示当前的Node获取共享式同步状态,EXCLUSIVE表示获取独占式同步状态。
      现在,我们来看看获取共享式同步状态的过程。

        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    

      我们通过调用acquireShared来获取一个共享式的同步状态,在acquireShared方法里面,调用了tryAcquireShared去尝试获取共享式的同步状态,当返回值大于等于0时,表示能够获取到同步状态;反之则不能获取同步状态。
      当获取同步状态失败,会调用doAcquireShared方法,我们来在这个方法里面都在干嘛。

        private void doAcquireShared(int arg) {
            final Node node = addWaiter(Node.SHARED);
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head) {
                        int r = tryAcquireShared(arg);
                        if (r >= 0) {
                            setHeadAndPropagate(node, r);
                            p.next = null; // help GC
                            if (interrupted)
                                selfInterrupt();
                            failed = false;
                            return;
                        }
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

      跟独占式差不多,当获取共享式的同步状态失败时,会在doAcquireShared方法里面进行自旋,如果当前节点的前驱节点为头条节点时,尝试获取同步状态,如果返回值为大于等于0时,表示此次获取同步状态成功并且从自旋状态中退出。
      我们也来看看释放共享式同步状态的操作。

        public final boolean releaseShared(int arg) {
            if (tryReleaseShared(arg)) {
                doReleaseShared();
                return true;
            }
            return false;
        }
    

      在releaseShared方法里面首先尝试释放同步状态,如果释放成功的话,调用doReleaseShared方法对后续线程进行唤醒操作。
      它与独占式的区别是,在tryReleaseShared方法里面必须保证同步状态被安全释放,一般是通过循环和CAS来保证的,因为使用同步状态的操作可能来自多个线程。

    (5).独占式超时获取同步状态

      通过调用doAcquireNanos(int arg, long nanosTimeout)方法可以超时获取同步状态,即在指定的时间内获取同步状态,如果获取成功的话则返回true;反之,则返回false。该方法提供了传统Java同步操作(比如synchronized关键字)所不具备的特性。
      在Java 5之前,当一个线程获取不到锁而被阻塞在synchronized那里,对该线程进行中断操作,此时该线程的中断标志位会被修改,但是线程依旧会阻塞在synchronized上,等待着获取锁。在Java5中,AbstractQueuedSynchronizer提供了acquireInterruptibly(int arg)方法,这个方法在等待获取同步状态时,如果当前的线程被中断了,会立刻返回,并且抛出InterruptedException。
      超时获取同步状态过程可以被视作响应中断获取同步状态过程的”增强版“,doAcquireNanos方法在支持响应中断的基础上,增加了超时获取的特性。

        private boolean doAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (nanosTimeout <= 0L)
                return false;
            final long deadline = System.nanoTime() + nanosTimeout;
            final Node node = addWaiter(Node.EXCLUSIVE);
            boolean failed = true;
            try {
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return true;
                    }
                    nanosTimeout = deadline - System.nanoTime();
                    if (nanosTimeout <= 0L)
                        return false;
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        nanosTimeout > spinForTimeoutThreshold)
                        LockSupport.parkNanos(this, nanosTimeout);
                    if (Thread.interrupted())
                        throw new InterruptedException();
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

      对于超时获取同步状态,关键在于睡眠时间的计算。为了得到及时的响应,nanosTimeOut = deadline - System.nanoTime(),如果nanosTimeOut小于等于0时,表示时间已经到了;反之则表示时间还未到,需要睡眠nanosTimeOut。
      在这种情况下获取同步状态跟独占式情况下获取比较类似,只不过这里需要注意几个地方:
      1.当nanosTimeout > spinForTimeoutThreshold时才进行睡眠,因为非常短的超时等待无法做到十分精确,如果此时还要进行超时等待,会让nanosTimeout从整体上表现的反而不精确。
      2.如果当前的线程被中断过,就会抛出InterruptedException异常。

    4.总结

       AbstractQueuedSynchronizer的实现原理最重要的一部分就是同步队列,不管是在获取同步状态还是释放状态,最终还是体现在对同步队列的维护。
       说实话,在这之前,我也看过AbstractQueuedSynchronizer类的源代码,但是当时是狗咬乌龟--无从下口。现在借助《Java 并发编程的艺术》对AbstractQueuedSynchronizer有了一定的了解,所以强烈推荐这本书!!!

    相关文章

      网友评论

          本文标题:Java 源码分析-AbstractQueuedSynchron

          本文链接:https://www.haomeiwen.com/subject/ufcirftx.html