美文网首页JavaJava架构技术进阶
连肝4天,这瞬间戳中面试官小心心的AQS大餐,给大家安排上!

连肝4天,这瞬间戳中面试官小心心的AQS大餐,给大家安排上!

作者: 傻姑不傻 | 来源:发表于2020-08-22 22:30 被阅读0次

    来源:https://segmentfault.com/a/1190000023709787

    前言

    AQS,英文全称AbstractQueuedSynchronizer,直接翻译为抽象的队列同步器。是JDK1.5出现的一个用于解决并发问题的工具类,由大名鼎鼎的Doug Lea打造,与synchornized关键字不同的是,AQS是通过代码解决并发问题。

    回顾并发问题

    并发问题是指在多线程运行环境下,共享资源安全的问题。
    现在的银行账户,通过银行卡和手机银行都可以操作账户, 如果我们同时拿着银行卡和存折去银行搞事情,会怎么样呢?

    package demo.pattren.aqs;
    
    public class Money {
        /**
         * 假设现在账户有1000块钱
         */
        private int money = 1000;
        /**
         * 取钱
         */
        public void drawMoney(){
            this.money--;
        }
        public static void main(String[] args) throws InterruptedException {
            Money money = new Money();
            for(int i=0; i<1000; i++){
                new Thread(() -> {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    money.drawMoney();
                },i + "").start();
            }
            Thread.sleep(2000);
            System.out.println("当前账户余额:" + money.money);
        }
    }
    

    这样想着是不是马上可以去银行搞一波事情? 哈哈,你想太多了,如果能这样搞,银行早破产了。我们主要是来分析一下出现这个问题的原因,JVM内存是JMM结构的,每个线程操作的数据是从主内存中复制的一个和备份,而多个线程就会存在多个备份,当线程中的备份数据被修改时,会将值刷新到主内存,比如多个线程同时获取到了账户的余额为500元,A线程存钱100,线程A将600刷新到主内存,\color{red}{主内存并不会主动通知其他线程此时值已经被修改},所以主内存的值此时与其他线程的值是不同的,如果其他线程再操作账户余额,是在500的基础上进行的,这显然不是我们想要的结果。

    解决并发问题

    JDK提供了多种解决多线程安全的方式。

    volatile关键字

    volatile是JDK提供的关键字,用来修饰变量,volatile修饰的变量能够保证多个线程下的可见性,如上个案例,A修改了账户的余额,然后将最新的值刷新到主内存,此时主内存会将最新的值同步到其他线程。


    volatile解决了多线程下数据读取一致的问题,\color{red}{即保证可见性,但是其并不能保证写操作的原子性},

    当多个线程同时写操作的时候,即多个线程同时去将线程中最新的值刷新到主内存,将会出现问题。


    通过volatile关键字修饰money变量,发下并不能解决线程安全问题。

    原子操作类

    原子操作类是JDK提供的一系列保证原子操作的工具类,原子类可以保证多线程环境下对其值的操作是安全的。

    package demo.pattren.aqs;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class AtomicMoney {
        /**
         * 假设现在账户有1000块钱
         */
        private AtomicInteger money = new AtomicInteger(1000);
        /**
         * 取钱
         */
        public void drawMoney(){
            //AtomicInteger的自减操作
            this.money.getAndDecrement();
        }
        public static void main(String[] args) throws InterruptedException {
            AtomicMoney money = new AtomicMoney();
            for(int i=0; i<1000; i++){
                new Thread(() -> {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    money.drawMoney();
                },i + "").start();
            }
            Thread.sleep(2000);
            System.out.println("当前账户余额:" + money.money);
        }
    }
    

    多次测试结果都是0,与预期一致。原子操作类是使用CAS(Compare and swap 比较并替换)的机制来保证操作的原子性,相对于锁,他的并发性更高。

    synchronized关键字

    synchronized关键字是jvm层面来保证线程安全的,通过在代码块前后添加monitorenter与monitorexit命令来保证线程的安全,而且在JDK1.6对synchronized关键字做了较大的优化,性能有了较大的提升。可以确定的是,通过synchronized肯定可以保证线程安全,所以使用synchronized也是很好的选择,当然synchronized锁的升级不可逆特征,导致在高并发下性能是不能很好的保证。

    Lock锁

    终于迎来了本篇文章的主角,前面的内容,其实与文章的主题AQS并没有直接的关联,就简单带过。前面很多都是JVM层面来保证线程安全的,而AQS则是完全通过代码层面来处理线程安全的。
    (PS:小节标题明明是Lock锁,怎么写AQS了,骗我读书少)


    博主怕挨打,正在全力解释中~~~~~。先上类图压场!


    如上图,左边是抽象队列同步器,而右边则是使用队列同步器实现的功能——锁、信号量、发令枪等。
    可以先不看源码,咱们自己思考,要以纯代码的方式实现应当考虑哪些问题?

    1. 线程互斥:可以使用state状态进行判断,state=0,则可以获取到锁,state>0,则不能获取。
    2. 排队等候:不能获取锁的线程应当存储起来,当锁释放后可以继续获取锁执行。
    3. 线程唤醒:当锁释放后,处于等待状态的线程应当被唤醒。
    4. 锁重入 : 如何解决同一个进入多个加锁的方法(不解决的话分分钟死锁给你看)。

    对于1、2两点,难度应带不大,而3、4两点如何去设计呢?我们通过伪代码预演操作流程。

    在业务端,是这样操作的。

      加锁
      {需要被锁住的代码}
      释放锁
    

    加锁与释放锁的逻辑

        if(state == 0)
          获取到锁
          set(state == 1)
        else
          继续等待
          while(true){
               if(state == 0)
                 再次尝试获取锁
          }
    

    这样设计之后,整个操作流程再次变成了串行操作。


    这和我们去食堂排队打饭是一样的,食堂不可能为每个学生都开放一个窗口,所以多个学生就会争抢有限的窗口,如果没有一定的控制,那么食堂每到吃饭的时候都是乱套的,一群学生围着窗口同时去打饭,想想都是多么的恐怖。而由此出现了排队的机制,一个窗口同一时间打饭的人只能有一个,当前一个人离开窗口后,后面排队的学生才能去打饭。


    源码解读

    下面我们深入JDK源码,领略大师级的代码设计。
    业务调用代码:

    package demo.aqs;
    
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class LockMoney {
        Lock lock = new ReentrantLock();
        /**
         * 假设现在账户有1000块钱
         */
        private int money = 1000;
        //private int money = 1000;
        /**
         * 取钱
         */
        public void drawMoney(){
            lock.lock();
            this.money--;
            lock.unlock();
        }
        public static void main(String[] args) throws InterruptedException {
            LockMoney money = new LockMoney();
            for(int i=0; i<1000; i++){
                new Thread(() -> {
                    try {
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    money.drawMoney();
                },i + "").start();
            }
            Thread.sleep(2000);
            System.out.println("当前账户余额:" + money.money);
        }
    }
    

    追踪Lock方法:
    直接看源码基本一会儿就晕车,我尝试绘制出lock方法的调用链路。然后结合源码解释。


    大家跟着箭头走一遍源码,多多少少能够体会到AQS的实现机制。

    NonfairSync.lock

    final void lock() {
        //CAS尝试将state从0更新为1,更新成功则执行if下面的代码。
        if (compareAndSetState(0, 1))
            //获取锁成功,执行线程执行
            setExclusiveOwnerThread(Thread.currentThread());
        else
            //获取锁失败,线程入队列
            acquire(1);
    }
    

    看到这段代码,是不是瞬间明白前面提到的1、2两点问题。首先compareAndSetState方法是使用Unsafe直接操作内存并且使用乐观锁的方式,能够保证有且仅有一个线程能够操作成功,是多线程安全的。即设置将state设置为1成功的线程能够抢占到锁(线程互斥),而没有设置成功的线程将进行入队操作(排队等候),这样感觉瞬间明朗了许多,那我们接着往下看。

    AbstractQueuedSynchronizor.acquire

     public final void acquire(int arg) {
        //tryAcquire失败并且acquireQueued成功,则调用selfInterrupt
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            //当线程获取锁失败并且线程阻塞失败会中断线程
            selfInterrupt();
    }
    

    AbstractQueuedSynchronizor的tryAcquire方法,其最终调用到了Sync的nonfairTryAcquire

     final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        //获取当前锁的状态值
        int c = getState();
        // state = 0,表示当前锁为空闲状态,其实这一段代码和前面lock的方法是一样的
        if (c == 0) {
            if (compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        //不等于0 则判断当前线程是否为持有锁的线程,如果是则执行代码,这里解决了重入锁问题
        else if (current == getExclusiveOwnerThread()) {
            //当前状态值 + 1(可以看前面的传参)
            int nextc = c + acquires;
            // 囧, 这里是超出了int的最大值才会出现这样的情况
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            //更新state的值
            setState(nextc);
            return true;
        }
        return false;
    }
    

    通过阅读源码,可以发现,tryAcquire方法在当前线程获取锁成功或者是重入锁的情况下返回true,否则返回false。而同时这个方法解决了上面提到的第4点锁重入的问题。ok,感觉越来越接近真相了,接着看addWaiter方法。
    理解addWaiter方法的代码,先看方法中用的得Node对象。 Node对象是对Thread对象的封装,使其具有线程的功能,同时他还有prev、next等属性。那么很明了,Node是一个链表结构的对象

       //前一个结点
       volatile Node prev;
       //下一个结点
       volatile Node next;
    
    

    同时AbstractQueuedSynchronizor中包含head、tail属性

     //Node链表的头结点
     private transient volatile Node head;
     //Node链表的尾结点
     private transient volatile Node tail;
    
    private Node addWaiter(Node mode) {
        //将当前线程包装为Node对象
        Node node = new Node(Thread.currentThread(), mode);
        //获取尾节点,当这段代码第一次运行的时候,并没有尾结点
        //所以肯定值为null,那么会执行下面的enq方法
        Node pred = tail;
        //当再次运行代码的时候,尾结点不再为null(enq方法初始化了尾结点,可以先往下看enq方法源码)
        if (pred != null) {
            //当前结点的前置结点指向之前的尾结点
            node.prev = pred;
            //CAS尝试将尾结点从pred设置为node
            if (compareAndSetTail(pred, node)) {
                //设置成功则将pred的next结点执行node
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    

    上面的解释听着有点绕脑袋。


    不着急,我们先看enq方法

    private Node enq(final Node node) {
        //死循环
        for (;;) {
            //获取尾结点
            Node t = tail;
            //尾结点为空,则初始化尾结点和头结点为同一个新创建的Node对象
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                //将当前结点设为为尾结点,并将前一个尾结点的next指向当前结点
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    //退出循环
                    return t;
                }
            }
        }
    }
    

    enq具体做了什么事情呢:

    1. 第一次循环,初始化头结点与尾结点 new Node()
    2. 第二次循环,将当前线程封装的Node对象设置为尾结点,并将前一个尾结点的next指向此Node

    这里需要一些时间 + 空间的想象力,但如果对链表结构比较熟悉的话,这里理解也是不太困难的。
    我们动态的想一想执行过程:

    1. 第一个线程进入lock方法,此时是肯定可以获取到锁,直接执行,不会进入到addWaiter方法
    2. 第二个线程进入lock方法,我们假设第一个线程还没有释放锁,此时进入执行enq方法,enq进行链表的初始化。
    1. 第三个线程以及更多的线程进入lock方法,此时不再执行enq方法,而是在初始化之后的链表进行链接。

    acquireQueued

    final boolean acquireQueued(final Node node, int arg) {
      //局部变量
      boolean failed = true;
      try {
          //局部变量
          boolean interrupted = false;
          //死循环
          for (;;) {
              //获取前置结点
              final Node p = node.predecessor();
              //前置结点为head并且尝试获取锁成功,则不阻塞
              if (p == head && tryAcquire(arg)) {
                  setHead(node);
                  p.next = null; // help GC
                  failed = false;
                  return interrupted;
              }
              //阻塞操作 , 判断是否应该阻塞 并且 阻塞是否成功
              if (
                    //是否在抢占锁失败后阻塞
                  shouldParkAfterFailedAcquire(p, node) &&
                  //Unsafe操作使线程阻塞
                  parkAndCheckInterrupt())
                  interrupted = true;
          }
      } finally {
          if (failed)
              cancelAcquire(node);
      }
    }
    

    shouldParkAfterFailedAcquire分析

    //Node pred 前置结点, Node node 当前结点
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        //获取前置结点的等待状态
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * 唤醒信号,即前结点正常,就设置waitStatus为SIGNAL,表示前置结点可以唤醒当前结点,那          * 么当前结点才会安心的被阻塞(如果前置结点不正常,可能就会导致自己不能被唤醒,那肯定不          * 能安心睡觉的)
             */
            return true;
        if (ws > 0) {
            /*
             * 找到前置结点中waitStatus <= 0 的Node结点并设置为当前结点的前置结点
             * 此状态表示结点不是处于正常状态,那么将他从链表中删除,直到找到状态正常的结点
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 当waitStatus = 0 或者 PROPAGATE(-3) 时,CAS设置值为SIGNAL(-1)
             * 此状态表示线程正常,但没有设置唤醒,一般为tail的前一个结点,那么需要将其设置为可唤醒          * 状态(SIGNAL)
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    

    图解如下。


    至此,我们了解了AQS对需要等待的线程存储的过程。
    而AQS的解锁以及公平锁、非公平锁,共享锁、独享锁等后续跟上。

    参考资料:

    https://www.cnblogs.com/water...
    https://www.jianshu.com/p/d61...

    相关文章

      网友评论

        本文标题:连肝4天,这瞬间戳中面试官小心心的AQS大餐,给大家安排上!

        本文链接:https://www.haomeiwen.com/subject/lxybjktx.html