美文网首页
并发系列四:基于两种案例来认识ReentrantLock源码加锁

并发系列四:基于两种案例来认识ReentrantLock源码加锁

作者: avengerEug | 来源:发表于2021-04-21 14:24 被阅读0次

    前言

    • 上篇文章咱们证明了synchronized关键字的特性:无锁、偏向锁、轻量锁、重(chong)偏向、重(chong)轻量、重量锁。可以说synchronized是jvm层面实现同步的方式。在jdk中,存在一个叫java.util.concurrent的包,简称JUC,它是一个jdk层面的并发包,里面存在了大量与并发编程相关的api,其中最代表意义的就是atomic和lock两种类别,前者是基于乐观锁CAS(Compare And Swap)的实现,后者是基于AQS(Abstract Queued Synchronizer)实现。本文将详细讲解下AQS原理以及根据两个案例来解读ReentrantLock源码。
    • 两个案例:

      1.线程A单独加锁
      2.线程A正在持有锁的过程中,线程t1来加锁

    一、AQS原理

    • AQS简称Abstract Queued Synchronizer,它的核心是基于一个双向链表组成的队列(CLH队列) + volatile关键字修饰的int类型变量实现的。(关于volatile关键字可以参考其他博主的一些总结: 传送门),大致核心可以以如下图来呈现:

      在这里插入图片描述
      简单总结就是:内部使用双向链表维护了一个队列,其中Node数据结构为此队列的基石,内部维护了prev(指向上一个节点)、next(指向下一个节点)、waitStatus(当前node的状态)、thread(当前维护的线程)四个重要的属性。其中waitStatus分别有如下取值:
      Node中waitStatus具体取值 含义
      CANCELLED(1) 中断或取消,此状态下的节点会从队列中移除
      SIGNAL(-1) 此状态下的节点一定是在队列排队中
      CONDITION(-2) 条件阻塞,比如说内部因Condition而阻塞的节点
      PROPAGATE(-3) 表示下一个acquireShared应该无条件传播
      0 默认状态

      除此之外,队列中还维护了三个属性,head(指向队列中的头节点)、state(锁的状态)、tail(指向队列中的尾节点)。其中,state的取值有两种情况,将以如下表展示出来:

      AQS中state具体取值 含义
      0 表示当前锁没有被线程持有
      1 表示当前锁正在被线程持有
      大于1 表示当前锁被线程重入了(重入锁),这里要注意:ReentrantLock重入了几次,就要释放几次锁

    二、案例1:线程A单独加锁

    • 代码如下:

      public class SimpleThreadLock {
      
          static ReentrantLock lock = new ReentrantLock(true);
      
          public static void main(String[] args) throws InterruptedException {
              Thread a = new Thread(() -> {
                  try {
                      lock.lock();
                      System.out.println("Get lock");
                  } catch (Exception e) {
                      e.printStackTrace();
                  } finally {
                      lock.unlock();
                  }
              }, "线程a");
      
              a.start();
              a.join();
              System.out.println("end");
          }
      }
      

      代码也比较简单,就是在主线程中创建了一个线程,并且内部去使用ReentrantLock加锁,获取到锁后就打印出Get lock这句话,当t1线程执行完后再继续执行主线程的逻辑。这里就不一步步演示断点了,直接上源码。

    • 这里先说明下ReentrantLock重载的两个构造方法

      // 默认非公平锁
      public ReentrantLock() {
          sync = new NonfairSync();
      }
      
      // 若传入true则是公平锁
      public ReentrantLock(boolean fair) {
          sync = fair ? new FairSync() : new NonfairSync();
      }
      

      因为咱们传入了true进去,所以此时,它是一把公平锁。

    • lock.lock()方法,因为咱们指定了使用公平锁,所以最终会进入ReentrantLock内部维护的FairSync类的lock方法

      // FairSync类下的lock方法
      final void lock() {
          acquire(1);
      }
      

      于是,我们需要找到acquire方法,此方法为AQS(父类AbstractQueuedSynchronizer)的方法,所以最终会进入如下这么一段代码:

      public final void acquire(int arg) {
          if (!tryAcquire(arg) &&
              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();
      }
      

      这段代码,看似很精简,但是它做的事真的太多了。浓缩的才是精华呀!好了,咱们不偏题,继续按照咱们的主题:线程A单独加锁。不过要继续往下看,还是要加深下acquire方法的含义,我们必须要tryAcquire方法返回false,才能继续走if条件中后面的逻辑,以及if条件内部的逻辑。于是,我们直接看tryAcquire方法源码:

    • tryAcquire方法

      protected boolean tryAcquire(int arg) {
          throw new UnsupportedOperationException();
      }
      

      tryAcquire方法是一个protected方法,内部直接抛出了一个异常,还记得咱们是从哪个类掉用到父类AbstractQueuedSynchronizer的acquire方法的?没错,就是FairSync类。那么咱们就直接定位到FairSync类的tryAcquire方法呗。

      protected final boolean tryAcquire(int acquires) {
          // 拿到当前线程,也就是线程A
          final Thread current = Thread.currentThread();
      
          // 拿到当前aqs的state变量,我们没有修改过它,
          // 默认为0
          int c = getState();
          if (c == 0) {
              // 进入此逻辑,此逻辑跟acquire方法有点类似
              // 必须要hasQueuedPredecessors()方法返回false
              // 才能继续往下执行,于是我们把hasQueuedPredecessors的源码也贴出来
              if (!hasQueuedPredecessors() &&
                  compareAndSetState(0, acquires)) {
                  setExclusiveOwnerThread(current);
                  return true;
              }
          }
          else if (current == getExclusiveOwnerThread()) {
              int nextc = c + acquires;
              if (nextc < 0)
                  throw new Error("Maximum lock count exceeded");
              setState(nextc);
              return true;
          }
          return false;
      }
      
    • hasQueuedPredecessors方法源码

      public final boolean hasQueuedPredecessors() {
          // 拿到aqs中的tail
          Node t = tail; 
          // 拿到aqs中的head
          Node h = head;
          Node s;
      
          return h != t &&
              ((s = h.next) == null || s.thread != Thread.currentThread());
      }
      

      此方法涵盖的情景比较多,但是就当前情景而言,它很容易理解,在当前情形中,我们压根没操作过tail和head那么h 肯定等于 t,所以此方法返回false,返回false后,我们回到FairSync类的tryAcquire方法,

      protected final boolean tryAcquire(int acquires) {
          // .... 上半部分代码省略
          if (c == 0) {
              // 在当前情景下,hasQueuedPredecessors返回的是false
              // 也就是说会继续走if后面的逻辑,
              // if后面的逻辑就是执行CAS操作,
              // 将state属性从0设置成1
              // 由于此时只有一个线程在执行,
              // 这个cas操作一定是成功的
              // cas成功后就会执行setExclusiveOwnerThread代码,这段代码很有用
              // 它是一个赋值的操作,也就是记录
              // 当前拥有锁的线程
              if (!hasQueuedPredecessors() &&
                  compareAndSetState(0, acquires)) {
                  setExclusiveOwnerThread(current);
                  return true;
              }
          }
          // .... 下半部分else if逻辑也省略了
          return false;
      }
      

      通过上述代码中的注释,我们可以发现,线程A加锁成功后会返回true,至此,tryAcquire的返回值为true。还记的我们是从哪个方法进来的吗?是的,是从父类AbstractQueuedSynchronizer的acquire方法进来的,上面总结到了,只有当tryAcquire返回false,才会继续往下执行。至此,线程A单独加锁的案例就结束了。通过这么一个单线程加锁的案例,如果你认为AQS很简单的话,那就大错特错了,单线程加锁的案例中,我们仅使用到了AQS中的state变量,CLH队列却始终没有涉及到,而且从加锁到加锁结束的整个过程,我们连一个Node类型的数据结构都没有看到过。那Node类型的数据结构什么时候会被用到呢?我们来看下一个案例线程A正在持有锁的过程中,线程t1来加锁

    三、案例2:线程A正在持有锁的过程中,线程t1来加锁

    • 同样的,咱们改造下代码:

      public class TwoThreadLock {
      
          static ReentrantLock lock = new ReentrantLock(true);
      
          public static void main(String[] args) throws InterruptedException {
              new Thread(() -> {
                  try {
                      lock.lock();
                      System.out.println("Thread a get lock");
                      TimeUnit.SECONDS.sleep(60);
                  } catch (Exception e) {
                      e.printStackTrace();
                  } finally {
                      lock.unlock();
                  }
              }, "线程a").start();
      
              Thread t1 = new Thread(() -> {
                  try {
                      lock.lock();
                      System.out.println("Thread t1 get lock");
                  } catch (Exception e) {
                      e.printStackTrace();
                  } finally {
                      lock.unlock();
                  }
              }, "线程t1");
      
              t1.start();
              t1.join();
      
              System.out.println("end");
          }
      }
      

      上段代码,毫无疑问,线程t1在调用lock.lock()方法时,就阻塞到那里了,要等线程a睡60s后才会继续执行,那么这里面到底做了哪些事呢?我们来一起研究下。

    • 同案例1,使用的是公平锁,最终肯定会调用到tryAcquire方法去,咱们这次就一次性的把tryAcquire方法给讲清楚

      protected final boolean tryAcquire(int acquires) {
          // 拿到当前线程,也就是线程t1
          final Thread current = Thread.currentThread();
      
          // 拿到当前aqs的state变量,此时的c是多少呢?
          // 没错,是1,因为锁已经被线程A占有了,此时的
          // state为1。于是它会走else if逻辑
          int c = getState();
          if (c == 0) {
              // 进入此逻辑,此逻辑跟acquire方法有点类似
              // 必须要hasQueuedPredecessors()方法返回false
              // 才能继续往下执行,于是我们把hasQueuedPredecessors的源码也贴出来
              if (!hasQueuedPredecessors() &&
                  compareAndSetState(0, acquires)) {
                  setExclusiveOwnerThread(current);
                  return true;
              }
          }
          // 走了else if逻辑,它也发现当前持有锁的线程不是自己呀,于是直接return false
          // 这里顺带解释下这个else if的逻辑,这个else if
          // 就是判断当前调用lock方法的线程是不是和当前持有
          // 锁的线程一样,如果是一样的,则将state + 1并赋值给nextc属性
          // 这就表示了ReentrantLock支持重入性
          // 那么什么时候会出现nextc属性小于0的情况呢?
          // nextc是一个int类型,当超过了它的存储返回后
          // 会出现小于0的情况 ===> 也就是说ReentrantLock
          // 的重入次数最大为支持int类型最大值
          else if (current == getExclusiveOwnerThread()) {
              int nextc = c + acquires;
              if (nextc < 0)
                  throw new Error("Maximum lock count exceeded");
              setState(nextc);
              return true;
          }
          return false;
      }
      

      通过上述代码块中的注释可知,线程t1的加锁流程并没有这么顺利,在tryAcquire方法中返回了false,那这代表了什么呢?是的,它代表着线程t1可以继续走acquire后面的逻辑了,咱们继续把acquire方法贴出来:

      public final void acquire(int arg) {
          // 在案例2的情况下,tryAcquire方法返回了false
          // 于是会执行后面的acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
          // 当acquireQueued(addWaiter(Node.EXCLUSIVE), arg)返回了true才会执行内部的selfInterrupt()方法
          if (!tryAcquire(arg) &&
              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();
      }
      

      于是,咱们先了解下addWaiter(Node.EXCLUSIVE)方法,它的源码如下:

      private Node addWaiter(Node mode) {
          // 此时的mode是由上述代码块传入的,
          // 它的值为Node.EXCLUSIVE ===> 这是一个空节点,
          // 值为null,
          // 创建了一个node节点, 内部维护了当前线程(线程t1),并且它的next节点为null(有Node的构造方法可知)
          Node node = new Node(Thread.currentThread(), mode);
          // 拿到aqs队列中的tail属性,
          // 此时肯定为null啊(aqs队列都没初始化,哪来的队尾节点)
          Node pred = tail;
          if (pred != null) {
              node.prev = pred;
              if (compareAndSetTail(pred, node)) {
                  pred.next = node;
                  return node;
              }
          }
          
          // 此时pred为null,即不会走上面的if逻辑,于是执行enq方法,记住:此时传入enq方法时的形参为新new出来的Node
          // 内部维护的是当前线程(线程t1)
          enq(node);
          return node;
      }
      

      上面代码块的注释也说了,最终会执行到enq方法,enq方干啥的呢?猜一下?是的,它就是初始aqs队列的。我们来看一下它的源码:

      /**
       形参node内部维护的线程为t2, 并且它的next属性指向为null
       */
      private Node enq(final Node node) {
          // 此处写了一个死循环,也就是常说的自旋锁
          for (;;) {
              // 自旋的过程中
              // 第一次自旋:
              //  拿到队尾元素, 此时队列都没有,肯定为null
              //  发现队列中的tail指向的是null,于是初始化tail节点,并让aqs中的head指向了tail,
              //  至此,aqs简易版本的队列就出来啦,
              //  head和tail指向同一个node,并且此node内部
              //  维护的thread、prev、next、waitStatus全是默认值
              // 由于是if else逻辑,所以初始化tail属性后,就会进行第二次自旋
              // 第二次自旋:
              //  再次拿到tail, 由于第一次自旋把tail给初始化了,所以此时拿到的tail不为null, 于是走了else逻辑
              //  在else中,主要操作的是形参node, 还记得形参node是什么吗? ==> 维护当前线程(线程t1)的node节点,
              //  此时会将node的上一个节点指向t节点
              //  同时进行cas操作,将node节点变成tail
              //  当cas成功后,再设置t的next指向node
              //  最终返回这个t.
              //  此时此刻这个t是什么样的数据结构呢?
              //  此时的这个t就是队列中的head节点了,
              //  并且它的next为node(维护线程t1)
              //  所以此时此刻队列中现在有两个元素了
              Node t = tail;
              if (t == null) { // Must initialize
                  if (compareAndSetHead(new Node()))
                      tail = head;
              } else {
                  node.prev = t;
                  if (compareAndSetTail(t, node)) {
                      t.next = node;
                      return t;
                  }
              }
          }
      }
      

      代码中的注释描述了enq的过程,我专门画了一个图来描述aqs队列产生的过程,帮助理解:


      在这里插入图片描述

      enq初始化aqs队列的过程后,就执行到了addWaiter方法的出口了

      private Node addWaiter(Node mode) {
          // ....上述代码省略
          // enq初始化队列后,会将node进行返回
          // 这个node就是维护线程t1的node,它已经是
          // 队列中的队列了
          enq(node);
          return node;
      }
      

      addWaiter方法执行完了之后,将继续执行acquire方法

      public final void acquire(int arg) {
          if (!tryAcquire(arg) &&
              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();
      }
      

      此时应该接着执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)了,由于addWaiter方法已经执行完成,返回的是拥有当前线程的node,同时它也是当前队列中的队尾。我们来查看下acquireQueued的源码:

      /**
       node形参为维护当前线程(t1)的节点,
       同时arg为1
       */
      final boolean acquireQueued(final Node node, int arg) {
          boolean failed = true;
          try {
              boolean interrupted = false;
              // 此处又自旋了
              for (;;) {
                  // 获取到当前节点的上一个节点,在
                  // 当前案例下,它是head节点
                  final Node p = node.predecessor();
                  // 第一次自旋:
                  //   做判断,发现上一个节点是head节点
                  //   于是继续执行加锁方法tryAcquire
                  //   因为在当前案例下,线程a睡眠了60s
                  //   肯定还是加锁失败的,加锁失败后,
                  //   则走下面的逻辑,这里就是为了当前
                  //   节点继续上锁、因为有可能前面的
                  //   节点已经释放锁了,或者说被park
                  //   的线程被unpark了,要继续自旋,
                  //   尝试获取锁
                  if (p == head && tryAcquire(arg)) {
                      setHead(node);
                      p.next = null; // help GC
                      failed = false;
                      return interrupted;
                  }
                  
                  // 判断当前这个节点是否需要park
                  // 什么是park?就是使用unsafe类来阻塞指定的线程,
                  // 在shouldParkAfterFailedAcquire方法中
                  // 传入的是当前节点和上一个节点,
                  // 大致逻辑为:
                  //   1. 判断当前节点的上一个节点(即p)的waitStatus是不是SIGNAL(-1)状态,如果是则返回true
                  //     SIGNAL代表什么呢?上面的表格中有说到
                  //     SIGNAL代表这个Node是处于排队状态
                  //     因此可以得出一个结论:如果上一个节点也处于排队状态
                  //     那么我就返回true,进而执行parkAndCheckInterrupt方法,parkAndCheckInterrupt方法就是让park当前线程,让当前线程进入阻塞状态,自旋再此暂停
                  //   2. 如果p节点的waitStatus为负数,即不是中断或者取消状态
                  //      那么它会将p的waitStatus置为-1.并返回false
                  //      进而进入第二次自旋,当进入第二次自旋时,若上面还未获取锁成功,那么当前线程就会被park
                  if (shouldParkAfterFailedAcquire(p, node) &&
                      parkAndCheckInterrupt())
                      interrupted = true;
              }
          } finally {
              if (failed)
                  cancelAcquire(node);
          }
      }
      

      所以,当线程t2在执行到此方法时,发现head即线程a对应的node的waitStatus为0,于是会自旋一次将head的waitStatus置为-1,然后再继续自旋,此时自己尝试加锁又失败了,此时就会进入park状态。所以就在acquireQueued方法处阻塞了,等待线程a释放锁后唤醒线程t1。至此案例2的加锁过程也结束了

    四、总结

    • 本次只是基于两个简单的案例来认识ReentrantLock加锁流程的源码,其中还有很多其他的case没有涉及到。这两种案例算是认识ReentrantLock加锁源码的入门吧。下篇博客将介绍下基于这两种案例的解锁过程。
    • ReentrantLock加锁流程涉及到每个方法的详细步骤可查看在github中的总结:传送门
    • I am a slow walker, but I never walk backwards.

    相关文章

      网友评论

          本文标题:并发系列四:基于两种案例来认识ReentrantLock源码加锁

          本文链接:https://www.haomeiwen.com/subject/kxqzlltx.html