美文网首页Java多线程
J.U.C之AQS:源码解析:独占式同步

J.U.C之AQS:源码解析:独占式同步

作者: 贪睡的企鹅 | 来源:发表于2019-04-30 16:06 被阅读0次

    获取独占式同步状态

    acquire

        /**
         * 独占式获取同步状态,如果当前线程获取同步状态成功则直接返回,
         * 如果获取失败则线程阻塞,并插入同步队列进行.等待调用release
         * 释放同步状态时,重新尝试获取同步状态。成功则返回,失败则阻塞等待下次release
         */
        public final void acquire(int arg) {
            /**
             *子类实现tryAcquire能否获取的独占式同步状态
             *如果返回true则获取同步状态成功方法直接返回
             *如果返回false则获取同步状态失败进入if语句
             */
            if (!tryAcquire(arg) &&
                    /** addWaiter创建一个独占式节点node,添加到同步队列尾部. */
                    /** acquireQueued自旋,同步队列头部后置第一个节点线程尝试获取同步状态,成功则设置其为head节点.失败则阻塞 */
                    acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    

    addWaiter

    创建节点node(类型是独占式),如果同步队列初始化则将当前节点添加到同步队列尾部,如果同步队列没有初始化则需要调用子函数enq创建一个同步队列并将当前节点添加到同步队列尾部.

          /** 获取同步状态失败,添加节点到同步队列尾部 **/
          private Node addWaiter(Node mode) {
            // 1. 将当前线程构建成Node
            Node node = new Node(Thread.currentThread(), mode);
            // 2. 判断尾节点是否为null,如果为null说明同步队列未初始化
            Node pred = tail;
            if (pred != null) {
                // 2.2 将当前节点插入同步队列尾部
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            // 2.1. 当前同步队列尾节点为null,说明CLH同步队列未初始化,调用enq
            enq(node);
            return node;
    }
    

    pred != null 逻辑图如下,将当前节点插入同步队列尾部(Thread 3表示当前节点)


    image

    pred == null 逻辑图如下,进入enq

    enq

    初始化开始时一个自旋,首先会获取尾部node,并判断是否为null,如果为null说明同步队列需要初始化,进入if语句创建一个空node节点,让首部和尾部节点都指向这个空节点.完成后重新进入自旋.此时按照之前判断尾部节点存在.我们会进入else语句将当前添加节点设置为tail节点.

    private Node enq(final Node node) {
            for (;;) {
                Node t = tail;
                if (t == null) { // Must initialize
                    //1. 构造头节点尾节点指向一个空节点
                    if (compareAndSetHead(new Node()))
                        tail = head;
                } else {
                    // 2. node节点插入同步队列尾,CAS操作失败自旋尝试
                    node.prev = t;
                    if (compareAndSetTail(t, node)) {
                        t.next = node;
                        return t;
                    }
                }
            }
    }
    

    第一次循环进入if

    image

    第二次循环进入else,并return环跳出了这个自悬循环体系

    image

    acquireQueued

    进入自旋,找到CLH头部后置第一个节点,尝试获取同步状态,成功则设置其为新head节点.失败则阻塞.
    总结流程:

    • 1 进入自旋
    • 2 判断当前节点前置节点是否为head节点,如果是尝试调用tryAcquire获取同步状态
    • 3 获取同步状态成功,设置当前节点为head节点,并释放当前节点前置节点的指针,retun跳出自旋.
    • 4 获取同步状态失败,获取同步状态失败,就将当前节点和前驱节点作为参数交给shouldParkAfterFailedAcquire调用,shouldParkAfterFailedAcquire设置要当前节点前驱节点node等待状态到-1返回false,自旋第二次进入时返回true,到parkAndCheckInterrupt()阻塞当前线程。
    • 如果节点线程被从阻塞中唤醒(可能是线程中断,也可能是当前节点是同步队列后第一个节点。锁被前置节点释放同步状态而唤醒),重新进入自旋步骤1。
    /**
         * 自旋,找到同步队列头部后置第一个节点,尝试获取同步状态,成功则设置其为新head节点.失败则阻塞.
         */
        final boolean acquireQueued(final Node node, int arg) {
            /** 执行是否失败 **/
            boolean failed = true;
            try {
                /** 标识是否被中断 **/
                boolean interrupted = false;
                /** 进入自旋 **/
                for (;;) {
                    /** 1. 获得当前节点的先驱节点  **/
                    final Node p = node.predecessor();
                     /** 如果当前节点的先驱节点是头结点并且成功获取同步状态,即可以获得独占式锁  **/
                    if (p == head && tryAcquire(arg)) {
                         /** 并将当前节点设置为head节点  **/
                        setHead(node);
                         /** 释放当前节前驱节点指针(这里前驱节点也相当于原始的head节点)等待GC回收  **/
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    /** 2.2 获取锁失败,线程阻塞(可响应线程被中断),  如果是中断响应设置interrupted = true;*
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                /** 发生异常失败  **/
                if (failed)
                /** 设置当前节点状态为消息  **/
                    cancelAcquire(node);
            }
        }
    

    shouldParkAfterFailedAcquire

    参数

    • 当前节点和前驱节点

    返回

    • true 调用parkAndCheckInterrupt()阻塞当前线程
    • false 则重新进入acquireQueued自旋

    逻辑

    • 1 如果发现前置节点等待状态waitStatus==1则说明此客户不在等待剔除出队列,返回false,在外层自悬循环 中从node重新向前开始查找。

    • 2 如果发现前置节点等待状态waitStatus==-1 返回true 调用parkAndCheckInterrupt()阻塞当前线程

    • 3 如果发现前置节点等待状态waitStatu==0设置为waitStatus=-1,返回false,则重新进入acquireQueued自旋

        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
            int ws = pred.waitStatus;
            //前置节点状态为-1 返回true 准备直塞当前节点线程
            if (ws == Node.SIGNAL)
                return true;
            //前置节点状态为 1,剔除出队列,在外层自悬循环    中从新开始查找  返回false
            if (ws > 0) {
                do {
                    node.prev = pred = pred.prev;
                } while (pred.waitStatus > 0);
                pred.next = node;
            //前置节点状态为 0,设置为 -1  在外层自悬循环    中从新开始查找  返回false   
            } else {
                compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
            }
            return false;
        }
    

    第一次循环

    是第3种情况,并且这个方法运行后返回false。会当前节点(一般式尾节点)的前置节点等待状态由0变成了-1

    如果添加节点并不是第一个等待的节点(thread2)

    image

    如果添加节点并是第一个等待的节点(thread3)

    image

    第二次循环
    是第2种情况 ,返回true开始阻塞线程,并在方法返回告知是否是中断导致线程唤醒

    parkAndCheckInterrupt

    阻塞当前线程(可响应中断),返回true 表示中断导致线程阻塞被唤醒

        /**
         * 阻塞当前线程(可响应中断),
         * 返回true 表示中断导致线程阻塞被唤醒
         */
        private final boolean parkAndCheckInterrupt() {
            /** 阻塞当前线程(可响应中断)**/
            LockSupport.park(this);
            /** 如果线程是中断从阻塞唤醒返回true **/
            return Thread.interrupted();
        }
    

    整体流程图

    image

    释放独占式同步状态

    入口函数 release

        /**
         * 释放独占式同步入口函数,
         * 参数arg传递给模板方法用来判断释放同步状态
         *
         * 释放同步状态会释放Head节点后置节点中线程从阻塞状态中唤醒
         */
        public final boolean release(int arg) {
            /**
             *子类实现能否释放的独占式同步状态
             *如果返回true则表示释放同步状态准入条件成功进入if语句
             *如果返回false则表示释放同步状态失败返回false
             */
            if (tryRelease(arg)) {
                /** 判断同步队列是否存在等待节点 **/
                Node h = head;
                if (h != null && h.waitStatus != 0)
                    /**
                     * 更新CLH同步队列Head节点的等待状态,将Head节点后置节点中线程从阻塞状态中唤醒
                     */
                    unparkSuccessor(h);
                return true;
            }
            return false;
        }
    

    unparkSuccessor

    更新CLH同步队列Head节点的等待状态,将Head节点后置节点中线程从阻塞状态中唤醒

       /**
         * 更新CLH同步队列Head节点的等待状态,将Head节点后置节点中线程从阻塞状态中唤醒
         */
        private void unparkSuccessor(Node node) {
            /**
             * 将Head节点的等待状态设置为0
             */
            int ws = node.waitStatus;
            if (ws < 0)
                compareAndSetWaitStatus(node, ws, 0);
    
            /** 默认情况下释放的节点为head节点后置节点s. **/
            /**
             * 如果head节点后置节点等待状态为1(取消),从尾节点开始遍历寻找最接近head节点等待状态为-1的节点作为释放节点s
             */
            Node s = node.next;
            if (s == null || s.waitStatus > 0) {
                s = null;
                for (Node t = tail; t != null && t != node; t = t.prev)
                    if (t.waitStatus <= 0)
                        s = t;
            }
            /** 唤醒s节点中线程阻塞,被唤醒节点线程重新进入acquireQueued自旋尝试获取同步状态 **/
            if (s != null)
                LockSupport.unpark(s.thread);
        }
    

    第一步:获取head节点的waitStatus,如果小于0,就通过CAS操作将head节点的waitStatus修改为0,现在是:

    image

    第二步:寻找head节点的下一个节点,如果这个节点的waitStatus小于0,就唤醒这个节点,否则遍历下去,找到第一个waitStatus<=0的节点,并唤醒。

    第三步: 被唤醒节点线程重新进入acquireQueued自旋尝试获取同步状态,如果成功则设置当前节点为head节点.原始head节点出队.

    image

    整体图

    image

    总结

    独占式同步可以看作是一把独占锁,每次仅仅只能由一个程序获的锁

    总体流程如下

    尝试锁失败 --> 进入等待队列排队  -->  阻塞当前线程 --> 当等待队列排到自己被唤醒 --> 尝试获取锁(可能被其他线程插队而导致获取锁失败,失败在次阻塞,等待下次排到自己)  --> 执行自己的业务逻辑 --> 尝试释放锁--> 成功通知等待队列前面共享节点线程从阻塞中唤醒。
    

    子类可以扩展如何获取锁,如何释放锁,但整体流程不变。

    白话

    我们举一个生活中例子来看独占式同步,我们都去餐厅吃过饭,如果去餐厅吃饭就时我们要执行业务,要进入餐厅吃饭的前提时要获取同步状态,如果餐厅只有一个位置,那么就独占式同步,因为同时只能一个进入餐厅吃饭。如果餐厅由多个位置,那么就是共享式同步,因为同步多个人进入餐厅吃饭。当然能不能进入逻辑时餐厅这个AQS子类去定制的。

    我们来看独占式例子:

    客人A进入餐厅吃饭,餐厅此时没有人,他可以顺利进入餐厅吃饭。

    客户B也进入餐厅吃饭,餐厅此时被A占着,尝试进入餐厅(子类实现逻辑判断),进入餐厅失败

    餐厅给B指定一个排队编号,此时B想插队,在次尝试进入餐厅(子类实现逻辑判断),A还没有出来,进入餐厅失败

    客户C也进入餐厅吃饭,餐厅此时被A占着尝试进入餐厅(子类实现逻辑判断),进入餐厅失败,

    餐厅给C指定一个排队编号(加入同步队列),此时C也想插队,在次尝试进入餐厅(子类实现逻辑判断),A刚好出来,餐厅还没有来得及通知B,C 进入餐厅吃饭

    C 吃饭完毕离开餐厅,尝试离开餐厅(子类实现逻辑判断),成功餐厅通知B(唤醒B线程),B尝试进入餐厅(可能还会被插队),成功进入,失败则在次等着(在次阻塞)

    实现

    实现一个AQS独占同步,需要继承AbstractQueuedSynchronizer,并重写tryAcquire,tryRelease,最常见的就是ReentrantLock,ReentrantLock内部定义了一个AbstractQueuedSynchronizer实现AQS内部类,通过同步状态来控制获取独占锁,其中0表示未占用,1表示已占用,当tryAcquire会使用CAS将同步状态设置为1,tryRelease会使用CAS将同步状态设置0.保证同时只能一个线程占用锁。

    相关文章

      网友评论

        本文标题:J.U.C之AQS:源码解析:独占式同步

        本文链接:https://www.haomeiwen.com/subject/khuvnqtx.html