JUC中ReentrantLock和AbstractQueued

作者: 匠丶 | 来源:发表于2018-10-16 22:57 被阅读7次

    本文涵盖的知识点包括:

    Lock和synchronized的对比
    AbstractQueuedSynchronizer的实现原理
    ReentrantLock的实现原理
    LockSupport的使用

    Lock和synchronized的对比

    在Lock接口出现之前,Java应用程序只能依靠synchronized关键字来实现同步锁的功能,在JDK1.5以后,增加了JUC(java.util.concurrent)的并发包且提供了Lock接口用来实现锁的功能。Lock和synchronized的对比:
    Ø 从层次上,一个是类、一个是关键字
    Ø 从使用上,lock具备更大的灵活性,可以显式地控制锁的释放和获取; 而synchronized的锁的释放是被动的,当出现异常或者同步代码块执行完以后,才会释放锁
    Ø lock可以判断锁的状态,而synchronized无法做到;lock可以实现公平锁、非公平锁, 而synchronized只有非公平锁。

    AbstractQueuedSynchronizer

    Lock之所以能实现线程安全的锁,主要的核心是AQS(AbstractQueuedSynchronizer),AbstractQueuedSynchronizer提供了一个FIFO队列,可以看做是一个用来实现锁以及其他需要同步功能的框架。这里简称该类为AQS。AQS的使用依靠继承来完成,子类通过继承自AQS并实现所需的方法来管理同步状态。例如常见的ReentrantLock,CountDownLatch。

    从使用上来说,AQS的功能可以分为两种:EXCLUSIVE独占模式和SHARED共享模式。
    独占模式下,每次只能有一个线程持有锁,比如ReentrantLock就是以独占方式实现的互斥锁。
    共享模式下,允许多个线程同时获取锁,并发访问共享资源,比如ReentrantReadWriteLock中的读锁。

    AQS依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
    Node的主要属性如下

    static final class Node {
      int waitStatus; //表示节点的状态,包含cancelled(取消);condition 表示节点在等待condition
    也就是在condition队列中
      Node prev; //前继节点
      Node next; //后继节点
      Node nextWaiter; //存储在condition队列中的后继节点
      Thread thread; //当前线程
    }
    

    AQS类底层的数据结构是使用双向链表,是队列的一种实现。包括一个head节点和一个tail节点,分别表示头结点和尾节点,其中头结点不存储Thread,仅保存next结点的引用。



    AQS提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Nodeupdate),它需要传递
    当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。



    AQS遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。

    AQS中,除了本身的链表结构以外,还有一个很关键的功能,就是CAS,这个是保证在多线程并发的情况下保证线程安全的前提下去把线程加入到AQS中的方法,可以简单理解为乐观锁。

    private final boolean compareAndSetHead(Node update) {
      return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }
    

    这个方法里面,首先用到了unsafe类(Unsafe类是在sun.misc包下,不属于Java标准。但是很多Java的基础类库,包括一些被广泛使用的高性能开发库都是基于Unsafe类开发的,比如Netty、Hadoop、Kafka等;Unsafe可认为是Java中留下的后门,提供了一些低层次操作,如直接内存访问、线程调度等)。

    ReentrantLock的实现原理分析

    之所以叫重入锁是因为同一个线程如果已经获得了锁,那么后续该线程调用lock方法时不需要再次获取锁,也就是不会阻塞;重入锁提供了两种实现,一种是非公平的重入锁,另一种是公平的重入锁。怎么理解公平和非公平呢?如果在绝对时间上,先对锁进行获取的请求一定先被满足获得锁,那么这个锁就是公平锁,反之,就是不公平的。简单来说公平锁就是等待时间最长的线程最优先获取锁。
    非公平锁的实现流程时序图


    1、调用ReentrantLock.lock,这个是获取锁的入口,调用了sync.lock。 sync是一个实现了AQS的抽象类,这个类的主要作用是用来实现同步控制的,并且sync有两个实现,一个是NonfairSync(非公平锁)、另一个是FailSync(公平锁)。
    2、先分析非公平锁的情况NonfairSync.lock
    final void lock() {
      if (compareAndSetState(0, 1)) //这是跟公平锁的主要区别,一上来就试探锁是否空闲,如果可以插队,
    则设置获得锁的线程为当前线程
    //exclusiveOwnerThread属性是AQS从父类AbstractOwnableSynchronizer中继承的属性,用来保存当前占用
    同步状态的线程
        setExclusiveOwnerThread(Thread.currentThread());
      else
        acquire(1); //尝试去获取锁
    }
    

    compareAndSetState通过cas算法去改变state的值,而这个states是AQS中存在一个变量,对于ReentrantLock来说,如果state=0表示无锁状态、如果state>0表示有锁状态。
    由于ReentrantLock是可重入锁,所以持有锁的线程可以多次加锁,经过判断加锁线程就是当前持有锁的线程时(即exclusiveOwnerThread==Thread.currentThread()),每次加锁都会将state的值+1,state等于几,就代表当前持有锁的线程加了几次锁;解锁时每解一次锁就会将state减1,state减到0后,锁就被释放掉,这时其它线程可以加锁。
    3、AbstractQueuedSynchronizer.acquire
    如果CAS操作未能成功,说明state已经不为0,此时继续acquire(1)操作,acquire是AQS中的方法 当多个线程同时进入这个方法时,首先通过cas去修改state的状态,如果修改成功表示竞争锁成功,竞争失败的,tryAcquire会返回false。

    public final void acquire(int arg) {
      if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
    }
    

    4、NonfairSync.tryAcquire
    nofairTryAcquire这里可以看非公平锁的含义,即获取锁并不会严格根据争用锁的先后顺序决定。

    final boolean nonfairTryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState(); //获取当前的状态,前面讲过,默认情况下是0表示无锁状态
      if (c == 0) {
        if (compareAndSetState(0, acquires)) { //通过cas来改变state状态的值,如果更新成功,表
    示获取锁成功, 这个操作外部方法lock()就做过一次,这里再做只是为了再尝试一次,尽量以最简单的方式获取锁。
          setExclusiveOwnerThread(current);
          return true;
       }
     }
      else if (current == getExclusiveOwnerThread()) {//如果当前线程等于获取锁的线程,表示重入,
    直接累加重入次数
        int nextc = c + acquires;
        if (nextc < 0) // overflow 如果这个状态值越界,抛出异常;如果没有越界,则设置后返回true
          throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
     }
    如果状态不为0,且当前线程不是owner,则返回false。
      return false; //获取锁失败,返回false
    }
    

    5、addWaiter
    当前锁如果已经被其他线程锁持有,那么当前线程来去请求锁的时候,会进入这个方法,这个方法主要是把当前线程封装成node,添加到AQS的链表中。
    6、acquireQueued
    addWaiter返回了插入的节点,作为acquireQueued方法的入参,这个方法主要用于争抢锁。

    final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
        boolean interrupted = false;
        for (;;) {
          final Node p = node.predecessor();// 获取prev节点,若为null即刻抛出
    NullPointException
          if (p == head && tryAcquire(arg)) {// 如果前驱为head才有资格进行锁的抢夺
            setHead(node); // 获取锁成功后就不需要再进行同步操作了,获取锁成功的线程作为新的
    head节点
    //凡是head节点,head.thread与head.prev永远为null, 但是head.next不为null
            p.next = null; // help GC
            failed = false; //获取锁成功
            return interrupted;
         }
    //如果获取锁失败,则根据节点的waitStatus决定是否需要挂起线程
          if (shouldParkAfterFailedAcquire(p, node) &&
            parkAndCheckInterrupt())// 若前面为true,则执行挂起,待下次唤醒的时候检测中断的标
    志
            interrupted = true;
       }
     } finally {
        if (failed) // 如果抛出异常则取消锁的获取,进行出队(sync queue)操作
          cancelAcquire(node);
     }
    }
    

    原来的head节点释放锁以后,会从队列中移除,原来head节点的next节点会成为head节点.


    公平锁和非公平锁的区别

    锁的公平性是相对于获取锁的顺序而言的,如果是一个公平锁,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。 在上面分析的例子来说,只要CAS设置同步状态成功,则表示当前线程获取了锁,而公平锁则不一样,差异点有两个:
    FairSync.tryAcquire:非公平锁在获取锁的时候,会先通过CAS进行抢占,而公平锁则不会

    final void** lock() {
      acquire(1);
    }
    

    FairSync.tryAcquire:这个方法与nonfairTryAcquire(int acquires)比较,不同的地方在于判断条件多了hasQueuedPredecessors()方法,也就是加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

    protected final boolean* tryAcquire(int acquires) {
      final Thread current = Thread.currentThread*();
      int c = getState();
      if (c == 0) {
        if (!hasQueuedPredecessors() &&
          compareAndSetState(0, acquires)) {
          setExclusiveOwnerThread(current);
          return true;
        }
      }
      else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
          throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
      }
      return false;
    }
    

    LockSupport

    LockSupport类是JDK1.6引入的一个类,提供了基本的线程同步原语。LockSupport实际上是调用了Unsafe类里的函数,归结到Unsafe里,只有两个函数:

    public native void unpark(Thread jthread); 
    public native void park(boolean isAbsolute, long time); 
    

    LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程,而且park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。因为park() 和 unpark()有许可的存在。

    1、 pack时:如果许可存在,那么将这个许可使用掉,并且立即返回。如果许可不存在,那么挂起当前线程。
    2、 unpack时:设置线程许可为可用。如果线程当前已经被pack挂起,那么这个线程将会被唤醒。如果线程当前没有被挂起,那么下次调用pack不会挂起线程。
    注意,unpark函数可以先于park调用。比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行。

    在使用LockSupport之前,我们对线程做同步,只能使用wait和notify,但是wait和notify其实不是很灵活,并且耦合性很高,调用notify必须要确保某个线程处于wait状态,而park/unpark模型真正解耦了线程之间的同步,先后顺序没有没有直接关联,同时线程之间不再需要一个Object或者其它变量来存储状态,不再需要关心对方的状态。

    相关文章

      网友评论

        本文标题:JUC中ReentrantLock和AbstractQueued

        本文链接:https://www.haomeiwen.com/subject/kyckzftx.html