美文网首页
深入解析AbstractQueuedSynchronizer源码

深入解析AbstractQueuedSynchronizer源码

作者: hcy0411 | 来源:发表于2018-09-03 18:11 被阅读0次

    AbstractQueuedSynchronizer是juc包下面解决资源竞争的基础,功能主要包括三部分:
    第一部分Condition监视器,已在Condition源码解析文章中做了分析。
    第二部分独占模式,ReentrantLock,ReentrantReadWriteLock的写锁都是基于AbstractQueuedSynchronizer的独占锁实现的。
    第三部分共享模式,ReentrantReadWriteLock的读锁,CountDownLatch,Semaphore都是基于AbstractQueuedSynchronizer的共享锁实现的。
    AbstractQueuedSynchronizer实现由两个队列实现,一个Sync queue。一个Condition Queue,Condition Queue再第一篇文章中已介绍。
    Sync queue是实现共享锁和独占锁的基础。
    本文分析独占锁实现。

    AbstractQueuedSynchronizer 类变量

        // 记录Sync Queue 的头
        private transient volatile Node head;
    
        /**
         * 记录Sync Queue 的尾部
         * method enq to add new wait node.
         */
        private transient volatile Node tail;
    
        // 同步状态,lock,计数器,信号箱的资源都是维护这个状态,类似于资源总数
        private volatile int state;
    

    Node类

    该类是两个队列实现的关键,Sync queue,Condition Queue里的节点都是Node节点

    static final class Node {
    
           /**
           SHARED 和EXCLUSIVE 主要区分共享模式和独占模式
           **/
            static final Node SHARED = new Node();
    
            static final Node EXCLUSIVE = null;
            // 取消状态
            static final int CANCELLED =  1;
           // 该状态说明后继节点需要被唤醒,独占模式
            static final int SIGNAL    = -1;
           // 该状态说明节点是Condition下的节点,参考Condition解析
            static final int CONDITION = -2;
            // 该状态用于共享模式,无条件传播唤醒
            static final int PROPAGATE = -3;
            // waitStatus 表示node节点的状态,0初始结束状态
            volatile int waitStatus;
           // Sync Queue是双向链表
            volatile Node prev;
            volatile Node next;
            // node线程
            volatile Thread thread;
            // Condition queue 单向链表
            Node nextWaiter;
    
            final boolean isShared() {
                return nextWaiter == SHARED;
            }
    
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }
    
            Node() {    // Used to establish initial head or SHARED marker
            }
    
            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }
    
            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
        }
    

    独占获取资源方法

    独占获取资源的入口如下:

       // 忽略中断异常
        public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
       // 抛出中断异常
        public final void acquireInterruptibly(int arg)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            if (!tryAcquire(arg))
                doAcquireInterruptibly(arg);
        }
        // 超时获取,并抛出中断异常
        public final boolean tryAcquireNanos(int arg, long nanosTimeout)
                throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            return tryAcquire(arg) ||
                doAcquireNanos(arg, nanosTimeout);
        }
    

    tryAcquire留给工具自己去实现,用于判断是否满足获取资源要求
    本文重点分析acquireQueued,doAcquireInterruptibly,doAcquireNanos实现。

    acquireQueued方法

    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    // 独占模式一次只能唤醒一个节点,即唤醒head的后继节点,这里判断节点是否是head的后继节点
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        // 如果是head的后继节点,说明没有竞争,直接把改节点设置成head节点,然后返回中断状态
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    // 如果不是head的后继节点,说明存在竞争,需要封装改节点,然后等待被唤醒
                    if (shouldParkAfterFailedAcquire(p, node) &&
                            parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    

    shouldParkAfterFailedAcquire

        /**
         * 该方法检查是否满足节点被封装的要求
         * 即pred的状态是否是SIGNAL,SIGNAL代表后继节点需要被唤醒
         * 不满足要求,修改满足要求,返回false,再来一次
         */
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            // 满足要求,返回true
            if (ws == Node.SIGNAL)
                return true;
            // 大于0表示节点的前驱节点被取消,需要跳过已被取消的所有节点
            if (ws > 0) {
    
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            } else {
    
                // 否则把前驱节点设置成SIGNAL状态
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    parkAndCheckInterrupt

        // 该方法封装阻塞线程,runnable->waiting,被唤醒之后检查是否是中断唤醒
        private final boolean parkAndCheckInterrupt() {
            // 这里阻塞,线程状态由runnable变成waiting状态,直到被唤醒
            LockSupport.park(this);
            // 被唤醒之后,检查是否是中断唤醒
            return Thread.interrupted();
        }
    

    cancelAcquire

    //因异常取消获取节点
    private void cancelAcquire(Node node) {
            // Ignore if node doesn't exist
            if (node == null)
                return;
            node.thread = null;                 // 1. 线程引用清空
            Node pred = node.prev;
            while (pred.waitStatus > 0)       // 2.  若前继节点是 CANCELLED 的, 则也一并清除
                node.prev = pred = pred.prev;
            Node predNext = pred.next;         // 3. 这里的 predNext也是需要清除的(只不过在清除时的 CAS 操作需要 它)
            node.waitStatus = Node.CANCELLED; // 4. 设置成清除状态
            if (node == tail && compareAndSetTail(node, pred)) { // 5. 若需要清除额节点是尾节点, 则直接 设置pred为尾节点,并删除predNext 
                compareAndSetNext(pred, predNext, null);   
            } else {
                int ws;
                if (pred != head &&
                        ((ws = pred.waitStatus) == Node.SIGNAL || // 6. 如果pred没被取消,设置pred的waitStatus==SIGNAL 表示后继节点需要唤醒
                                (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && 
                        pred.thread != null) {
                    Node next = node.next;
                    if (next != null && next.waitStatus <= 0) // 7. next.waitStatus <= 0 表示 next 是个一个想要获取lock的节点
                        compareAndSetNext(pred, predNext, next);
                } else {
                    unparkSuccessor(node); // 若 pred 是头节点, 直接唤醒下node的next节点。
                }
                node.next = node; // help GC
            }
        }
    

    以上是独占方式获取资源的过程,其中doAcquireInterruptibly,doAcquireNanos两方法实现和acquireQueued实现区别不是很大,支持抛出中断异常,和超时等待,不做具体分析。

    release

        public final boolean release(int arg) {
            if (tryRelease(arg)) {
                Node h = head;
                // 如果head不为空,并列有后继节点需要被唤醒,直接唤醒后继节点
                if (h != null && h.waitStatus != 0)
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    unparkSuccessor

    private void unparkSuccessor(Node node) {
    
        int ws = node.waitStatus;
        // 唤醒后继节点之前,先把node设置成最终状态0
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
    
    
        // 拿到node的后继节点,如果被取消,设置成null,遍历,直到拿到需要被唤醒的节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        //不为空,唤醒
        if (s != null)
            LockSupport.unpark(s.thread);
    }
    

    以上就是独占模式下获取资源和释放资源的过程,之后会继续分析共享模式实现。

    相关文章

      网友评论

          本文标题:深入解析AbstractQueuedSynchronizer源码

          本文链接:https://www.haomeiwen.com/subject/yzahwftx.html