美文网首页
详解java并发包源码之AQS独占方法源码分析

详解java并发包源码之AQS独占方法源码分析

作者: 小刀爱编程 | 来源:发表于2018-11-18 17:52 被阅读0次

    AQS 的实现原理

    学完用 AQS 自定义一个锁以后,我们可以来看一下刚刚使用过的方法的实现。

    分析源码的时候会省略一些不重要的代码。

    AQS 的实现是基于一个 FIFO 队列的,每一个等待的线程被封装成Node存放在等待队列中,头结点是空的,不存储信息,等待队列中的节点都是阻塞的,并且在每次被唤醒后都会检测自己的前一个节点是否为头结点,如果是头节点证明在这个线程之前没有在等待的线程,就尝试着去获取共享资源。

    AQS 的继承关系

    AQS 继承了AbstractOwnableSynchronizer,我们先分析一下这个父类。

    public abstract class AbstractOwnableSynchronizer

        implements java.io.Serializable {

        protected AbstractOwnableSynchronizer() { }

        /**

        * 独占模式下的线程

        */

        private transient Thread exclusiveOwnerThread;

        /**

        * 设置线程,只是对线程的 set 方法

        */

        protected final void setExclusiveOwnerThread(Thread thread) {

            exclusiveOwnerThread = thread;

        }

        /**

        * 设置线程,对线程的 get 方法

        */

        protected final Thread getExclusiveOwnerThread() {

            return exclusiveOwnerThread;

        }

    }

    父类非常简单,持有一个独占模式下的线程,然后就只剩下对这个线程的 get 和 set 方法。

    AQS的内部类

    AQS 是用链表队列来实现线程等待的,那么队列肯定要有节点,我们先从节点讲起。

    Node 类,每一个等待的线程都会被封装成 Node 类

    Node 的域

    public class Node {

        int waitStatus;

        Node prev;

        Node next;

        Thread thread;

        Node nextWaiter;

    }

    waitStatus:等待状态

    prev:前驱节点

    next:后继节点

    thread:持有的线程

    nextWaiter:condiction 队列中的后继节点

    Node 的 status:

    Node 的状态有四种:

    CANCELLED,值为 1,表示当前的线程被取消,被打断或者获取超时了

    SIGNAL,值为 -1,表示当前节点的后继节点包含的线程需要运行,也就是 unpark;

    CONDITION,值为 -2,表示当前节点在等待 condition,也就是在 condition 队列中;

    PROPAGATE,值为 -3,表示当前场景下后续的 acquireShared 能够得以执行;

    取消状态的值是唯一的正数,也是唯一当排队排到它了也不要资源而是直接轮到下个线程来获取资源的

    AQS 中的方法源码分析

    acquire

    这个方法执行了:

    tryAcquire

    public final void acquire(int arg) {

        if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

    }

    看到上面的 tryAcquire 返回 false 后就会调用addWaiter新建节点加入等待队列中。参数 EXCLUSIVE 是独占模式。

    private Node addWaiter(Node mode) {

        Node node = new Node(Thread.currentThread(), mode);

        // 拿到尾节点,如果尾节点是空则说明是第一个节点,就直接入队就好了

        Node pred = tail;

        if (pred != null) {

            node.prev = pred;

            if (compareAndSetTail(pred, node)) {

                pred.next = node;

                return node;

            }

        }

        // 如果尾节点不是空的,则需要特殊方法入队

            enq(node);

        return node;

    }

    在addWaiter方法创建完节点后,调用 enq 方法,在循环中用 CAS 操作将新的节点入队。

    因为可能会有多个线程同时设置尾节点,所以需要放在循环中不断的设置尾节点。

    private Node enq(final Node node) {

        for (;;) {

            Node t = tail;

            // 查看尾节点

            if (t == null) { // Must initialize

                // 尾节点为空则设置为刚刚创建的节点

                if (compareAndSetHead(new Node()))

                    tail = head;

            } else {

                // 尾节点

                node.prev = t;

                if (compareAndSetTail(t, node)) {

                    t.next = node;

                    return t;

                }

            }

        }

    }

    在这里,节点入队就结束了。

    那么我们回来前面分析的方法,

    public final void acquire(long arg) {

        if (!tryAcquire(arg) &&

            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

            selfInterrupt();

    }

    刚刚分析完了addWaiter方法,这个方法返回了刚刚创建并且加入的队列。现在开始分析acquireQueued方法。

    final boolean acquireQueued(final Node node, int arg) {

        boolean failed = true;

        try {

            boolean interrupted = false;

            // 在循环中去获取锁

            for (;;) {

                // 拿前一个节点

                final Node p = node.predecessor();

                // 如果前一个节点是头结点则尝试着去获取锁

                if (p == head && tryAcquire(arg)) {

                    // 拿到锁后把这个节点设为头结点,这里 setHead 会把除了 next 以外的数据清除

                    setHead(node);

                    p.next = null; // help GC

                    failed = false;

                    return interrupted;

                }

                // 这个方法查看在获取锁失败以后是否中断,如果否的话就调用

                // parkAndCheckInterrupt 阻塞方法线程,等待被唤醒

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    interrupted = true;

            }

        } finally {

            if (failed)

                cancelAcquire(node);

        }

    }

    acquireInterruptibly

    因为很像所以顺便来看一下acquireInterruptibly所调用的方法:在此我向大家推荐一个架构学习交流裙。交流学习裙号:821169538,里面会分享一些资深架构师录制的视频录像

    private void doAcquireInterruptibly(int arg)

        throws InterruptedException {

        final Node node = addWaiter(Node.EXCLUSIVE);

        boolean failed = true;

        try {

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null; // help GC

                    failed = false;

                    return;

                }

                // 只有这一句有差别,获取失败了并且检测到中断位被设为 true 直接抛出异常

                if (shouldParkAfterFailedAcquire(p, node) &&

                    parkAndCheckInterrupt())

                    throw new InterruptedException();

            }

        } finally {

            if (failed)

                cancelAcquire(node);

        }

    }

    acquireNanos

    再来看一下有限时间的,当获取超时以后会将节点 Node 的状态设为 cancel,设置为取消的用处在后面的 release 方法中会有体现。

    private boolean doAcquireNanos(int arg, long nanosTimeout)

            throws InterruptedException {

        if (nanosTimeout <= 0L)

            return false;

        final long deadline = System.nanoTime() + nanosTimeout;

        final Node node = addWaiter(Node.EXCLUSIVE);

        boolean failed = true;

        try {

            for (;;) {

                final Node p = node.predecessor();

                if (p == head && tryAcquire(arg)) {

                    setHead(node);

                    p.next = null;

                    failed = false;

                    return true;

                }

                nanosTimeout = deadline - System.nanoTime();

                if (nanosTimeout <= 0L)

                    return false;

                if (shouldParkAfterFailedAcquire(p, node) &&

                    nanosTimeout > spinForTimeoutThreshold)

                    LockSupport.parkNanos(this, nanosTimeout);

                if (Thread.interrupted())

                    throw new InterruptedException();

            }

        } finally {

            if (failed)

                cancelAcquire(node);

        }

    }

    总结一下过程

    release

    这个方法首先去调用了我们实现的 tryRelease,当结果返回成功的时候,拿到头结点,调用 unparkSuccessor 方法来唤醒头结点的下一个节点。在此我向大家推荐一个架构学习交流裙。交流学习裙号:821169538,里面会分享一些资深架构师录制的视频录像 

    public final boolean release(long arg) {

        if (tryRelease(arg)) {

            Node h = head;

            if (h != null && h.waitStatus != 0)

                unparkSuccessor(h);

            return true;

        }

        return false;

    }

    private void unparkSuccessor(Node node) {

        int ws = node.waitSatus;

        // 因为已经获取过锁,所以将状态设设为 0。失败也没所谓,说明有其他的线程把它设为0了

        if (ws < 0)

            compareAndSetWaitStatus(node, ws, 0);

        /*

        * 一般来说头结点的下一个节点是在等待着被唤醒的,但是如果是取消的或者意外的是空的,

        * 则向后遍历直到找到没有被取消的节点

        *

        */

        Node s = node.next;

        // 为空或者大于 0,只有 cancel 状态是大于 0 的

        if (s == null || s.waitStatus > 0) {

            s = null;

            for (Node t = tail; t != null && t != node; t = t.prev)

                if (t.waitStatus <= 0)

                    s = t;

        }

        if (s != null)

            LockSupport.unpark(s.thread);

    }

    相关文章

      网友评论

          本文标题:详解java并发包源码之AQS独占方法源码分析

          本文链接:https://www.haomeiwen.com/subject/ipvgfqtx.html