美文网首页
ReentrantLock理解AQS同步队列的细节和设计模式

ReentrantLock理解AQS同步队列的细节和设计模式

作者: 竖起大拇指 | 来源:发表于2020-05-19 14:51 被阅读0次

Lock接口

public interface Lock {
  //加锁
  void lock();
  //可中断锁与Lock不同之处在于可响应中断操作,即在获取锁的过程中可以中断
  void lockInterruptibly() throws InterruptedException;
  //立即返回的获取锁,返回true则表示成功,false表示失败
  boolean tryLock();
//根据传入时间立即返回的获取锁,返回true表示成功,false表示失败
  boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  //释放锁
  void unlock();

  Condition newCondition();
}

Lock的实现

实现Lock接口的类有很多,以下为几个常见的锁实现

  • ReentrantLock:表示重入锁,它是唯一一个实现了Lock接口的类。重入锁指的是线程在获得锁之后,再次获取该锁不需要阻塞,而是直接关联一次计数器增加重入次数
  • ReentrantReadWriteLock:重入读写锁,它实现了ReadWriteLock接口,在这个类中维护了两个锁,一个是ReadLock,一个是WriteLock,他们都分别实现了Lock接口。读写锁是一种适合读多写少的场景下解决线程安全问题的工具,基本原则是:读和读不互斥、读和写互斥、写和写互斥。 也就是说涉及到影响数据变化的操作都会存在互斥。
  • StampedLock: stampedLock是JDK8引入的新的锁机制,可以简单认为是读写锁的一个改进版本,读写锁虽然通过分离读和写的功能使得读和读之间可以完全并发,但是读和写是有冲突的,如果大量的读线程存在,可能会引起写线程的饥饿。stampedLock是一种乐观的读策略,使得读线程完全不会阻塞写线程。和ReadWriteLock相比,写入的加锁是完全一样的,不同的是读取。注意到首先我们通过tryOptimisticRead()获取一个乐观读锁,并返回版本号。接着进行读取,读取完成后,我们通过validate()去验证版本号,如果在读取过程中没有写入,版本号不变,验证成功,我们就可以放心地继续后续操作。如果在读取过程中有写入,版本号会发生变化,验证将失败。在失败的时候,我们再通过获取悲观读锁再次读取。由于写入的概率不高,程序在绝大部分情况下可以通过乐观读锁获取数据,极少数情况下使用悲观读锁获取数据
    另外是不可重入锁,不能在一个线程中反复获取同一个锁。
我们一般使用ReentrantLock:
Lock lock=new ReentrantLock()
lock.lock();
try{
//代码....
}finally{
lock.unlock();
}

ReentrantLock默认是非公平锁,我们先来了解下

公平锁和非公平锁特点

公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入FIFO队列,队列中的第一个线程才能获得锁。公平锁的优点是等待锁的线程不会夯死。缺点是吞吐效率相对非公平锁要低,等待队列中除了第一个线程以外的所有线程都会阻塞,CPU唤醒阻塞线程的开销比非公平锁大。
非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但是如果此时锁刚好可用,那么该线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取到锁的场景。 非公平锁的优点就是可以减少唤起线程的开销(因为可能有的线程可以直接获取到锁,CPU也就不用唤醒它),所以整体的吞吐效率高。缺点是处于等待队列中的线程可能会夯死(如果每次有线程来,它恰巧每次都获取到锁,此时还在排队等待锁的线程就悲剧了),或者等很久才会获得锁。

ReentrantLock与AQS
public class ReentrantLock implements Lock, java.io.Serializable {
 private final Sync sync;

 /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
abstract static class Sync extends AbstractQueuedSynchronizer {
........
}

 /**
     * Sync object for non-fair locks
     */
static final class NonfairSync extends Sync {
      .....
}

 /**
     * Sync object for fair locks
     */
    static final class FairSync extends Sync {
      ....
    }


}
  • ReentrantLock实现了Lock接口,内部类有Sync,NonfairSync,FairSync(他们三个继承了AQS),创建ReentrantLock时可以指定是非公平锁还是公平锁
  • Sync:是抽象类,实现了tryRelease方法,tryAccquire方法由它的子类NonfairSync,FairSync自己实现。
  • AQS:是一个抽象类,但是代码中却没有一个抽象方法,其中获取锁(tryAcquire方法)和释放锁(tryRelease方法)并没有提供默认实现,需要子类重写这个两个方法实现具体逻辑
  • Node:AQS的内部类,本质上是一个双向链表,用来管理获取锁的线程。
Node和AQS工作原理

AQS是提供基础设施,如构建同步队列,控制同步状态等,那么它是如何构建同步队列的呢?它的工作原理是怎么样的呢?

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
  //指向同步队列对头
 private transient volatile Node head;
//指向同步队列的队尾
private transient volatile Node tail;

//同步状态,0代表锁未被占用,1代表锁已经占用
 /**
     * The synchronization state.
     */
    private volatile int state;
  .......
}

再看看Node这个内部类:它是对每一个访问同步代码块的线程的封装,关于等待状态,我们暂时只需要关注SINAL和初始化状态即可

static final class Node {
      static final Node SHARED = new Node();
      static final Node EXCLUSIVE = null;
      static final int CANCELLED =  1;
      static final int SIGNAL    = -1;
      static final int CONDITION = -2;
      static final int PROPAGATE = -3;
      volatile int waitStatus;
      volatile Node prev; //前驱节点
      volatile Node next; //后继节点
      volatile Thread thread;//当前线程
      Node nextWaiter; //存储在condition队列中的后继节点
      //是否为共享锁
      final boolean isShared() { 
          return nextWaiter == SHARED;
      }

      final Node predecessor() throws NullPointerException {
          Node p = prev;
          if (p == null)
              throw new NullPointerException();
          else
              return p;
      }

      Node() {    // Used to establish initial head or SHARED marker
      }
      //将线程构造成一个Node,添加到等待队列
    Node(Node nextWaiter) { // Used by addWaiter
          this.nextWaiter = nextWaiter;
          U.putObject(this, THREAD, Thread.currentThread());
      }
      //这个方法会在Condition队列使用,
      Node(int waitStatus) { // Used by Condition
         U.putInt(this, WAITSTATUS, waitStatus);
          U.putObject(this, THREAD, Thread.currentThread());
      }
  }
  • AQS本质上就是由Node构成的双向链表,内部有node head和node tail。
  • AQS内部通过state来控制同步状态,当state=0时,则说明没有任何线程占有共享资源,当state=1时,则说明有线程目前正在使用共享变量,其他线程必须加入同步队列进行等待;
  • AQS内部通过内部类Node构成FIFO的同步队列来完成线程获取锁的排队工作,同时利用内部类ConditionObject构建等待队列,当Condtion调用wait()方法后,线程将会加入等待队列中,而当Condtion调用signal()方法后,线程将从等待队列转移同步队列中进行锁竞争。注意这里涉及到两种队列,一种同步队列,当线程请求锁而等待,后将加入同步队列,另一种则是等待队列,通过Condition调用await()方法释放锁后,加入等待队列。
  • 当我们调用ReentrantLock.lock()方法时,实际操作的是基于Node结构的同步队列,此时AQS中的state变量则是代表同步状态,加锁后,如果此时state的值为0,则说明当前线程可以获取到锁,同时将state设置为1,则表示获取成功。如果调用ReentrantLock.lock()方法时sate已为1,也就是当前锁已被其他线程持有,那么当前执行线程将被封装为Node节点加入同步队列等待.


    image.png

获取锁

 public ReentrantLock() {
        sync = new NonfairSync();
    }

public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

//加锁操作
 public void lock() {
        sync.lock();
    }

这里说明ReentrantLock默认就是构造一个非公平锁,调用lock方法时候:

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        // Android-removed: @ReservedStackAccess from OpenJDK 9, not available on Android.
        // @ReservedStackAccess
        final void lock() {
      执行CAS操作,本质就是CAS更新state,判断state是否为0,
    如果为0则把0更新为1,并返回true否则返回false
            if (compareAndSetState(0, 1))

        //成功则将独占锁线程设置为当前线程  
       
          setExclusiveOwnerThread(Thread.currentThread());
            else
            //否则再次请求同步状态
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

也就是说,通过CAS机制保证并发的情况下只有一个线程可以成功将state设置为1,获取到锁;此时,其他线程在执行compareAndSetState时,因为state此时不是0,所以会失败并返回false,执行acquire(1);

加入同步队列:
   public final void acquire(int arg) {
    //再次尝试获取同步状态 
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
 /**
         * Performs non-fair tryLock.  tryAcquire is implemented in
         * subclasses, but both need nonfair try for trylock method.
         */
        // Android-removed: @ReservedStackAccess from OpenJDK 9, not available on Android.
        // @ReservedStackAccess
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState(); //获取volatile类型的state
            if (c == 0) {
            //这里就是非公平锁的诀窍:state为0表示当前没有线程获取锁,
          //此时不管条件队列的顺序,只要有线程获取锁失败则会执行CAS操作再次尝试获取一次.
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
          //这是表示获取锁的线程是Owner,也就是重入锁线程加锁的情况,
       //这时status会再次+1,因为是同一个线程所以不会有线程安全问题
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

假设有三个线程:线程1已经获得了锁,线程2正在同步队列中排队,此时线程3执行lock方法尝试获取锁的时候,线程1正好释放了锁,将state更新为0,那么线程3就可能在线程2还没有被唤醒之前去获取到这个锁。

如果此时还没获取到锁,那么接下来就会把该线程封装成node去同步队列里排队,acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

 /**
     * Creates and enqueues node for current thread and given mode.
     *
     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
     * @return the new node
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(mode);
      //死循环
        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
                U.putObject(node, Node.PREV, oldTail);
            //队尾添加新结点
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return node;
                }
            } else {
                initializeSyncQueue();
            }
        }
    }

  private final void initializeSyncQueue() {
        Node h;
        if (U.compareAndSwapObject(this, HEAD, null, (h = new Node())))
            tail = h;
    }
image.png

这个方法使用一个死循环进行CAS操作,可以解决多线程并发问题。假设线程1,2,3,4同时执行addWaiter()方法入队,而此时头节点不为null,那么他们会同时执行addWaiter中的compareAndSetTail方法将队尾指向它,添加到队尾。但是这个时候CAS操作保证只有一个可以成功,假设此时线程1成功添加到队尾,那么线程2,3,4执行CAS都会失败,方法内部死循环保证所有线程直到成功添加到队尾为止.

这里会涉及到两个变化

  • 新的线程封装成Node节点追加到同步队列中,设置prev节点以及修改当前节点的前置节点的next节点指向自己
  • 通过CAS讲tail重新指向新的尾部节点
自旋

添加到同步队列后,结点就会进入一个自旋过程,即每个结点都在观察时机待条件满足获取同步状态,自旋过程是在acquireQueued方法中执行的.

  final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            //死循环  自旋
            for (;;) {
            //获取前置节点
                final Node p = node.predecessor();
            当且仅当p为头结点才尝试获取同步状态,此时符合FIFO的原则
                if (p == head && tryAcquire(arg)) {
                //将Node设置为头结点
                    setHead(node);
                //清空原头结点的引用便于GC
                    p.next = null; // help GC
                    return interrupted;
                }
              //如果前驱结点不是head,判断是否挂起线程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (Throwable t) {
          //最终都没能获取到同步状态  结束该线程的请求
            cancelAcquire(node);
            throw t;
        }
    }
    //设置头结点
   private void setHead(Node node) {
        head = node;
        清空结点数据 以便于GC
        node.thread = null;
        node.prev = null;
    }

image.png

这个过程也是涉及到两个变化

  • 修改head节点指向下一个获得锁的节点
  • 新的获得锁的节点,将prev的指针指向null
    这里有一个小的变化,就是设置head节点不需要用CAS,原因是设置head节点是由获得锁的线程来完成的,而同步锁只能由一个线程获得,所以不需要CAS保证,只需要把head节点设置为原首节点的后继节点,并且断开原head节点的next引用即可

当然如果前驱结点不是head而它又没有获取到锁,那么执行如下:

 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //获取当前结点的等待状态
        int ws = pred.waitStatus;
    //如果是等待唤醒 状态 则返回true
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
    //如果ws>0 说明是结束状态
//遍历前驱结点直到找到没有结束状态的节点
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
//如果前节点被取消,则将取消节点移除队列操作
            //找到当前节点的前节点,如果前节点为取消节点则一直往前寻找一个节点。pred.waitStatus > 0标示已取消
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
      //如果ws小于0又不是SIGNAL状态
//则将其设置为SIGNAL状态 代表该节点的线程正在等待唤醒
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }


  private final boolean parkAndCheckInterrupt() {
      //将当前线程挂起 线程会阻塞住
        LockSupport.park(this);
    //获取线程中断状态,interrupted是判断当前中断状态 
    //并非中断线程 因此可能返回true也可能返回false
        return Thread.interrupted();
    }

这段代码有个设计比较好的点:
通常我们在设计队列时,我们需要考虑如何最大化的减少后续排队节点对于CPU的消耗,而在AQS中,只要当前节点的前驱结点不是头节点,再把当前节点加入到队列后就会执行LockSupport.park(this);将当前线程挂起,这样可以最大程度减少CPU消耗.

AQS通过最简单的CAS和LockSupport的park,设计出了高效的队列模型和机制:
1.AQS结构其实是在第二个线程获取锁的时候在初始化,就是lazy-init的思想,最大限度减少不必要的代码执行的开销
2.为了最大程度上提升效率,尽量避免线程间的通讯,采用了双向链表的Node结构去存储线程
3.为了最大程度上避免CPU上下文切换执行的消耗,在设计排队线程时,只有头结点的下一个的线程一直在重复执行后去锁,队列后面的线程会通过LockSupport进行休眠。

非公平锁的释放
//ReentrantLock类的unlock方法
public void unlock() {
        sync.release(1);
    }

//AQS的release方法
  public final boolean release(int arg) {
//尝试释放锁
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
          //唤醒后继结点的线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

//ReentrantLock类中的内部类Sync实现的tryRelease(int releases)方法
 protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
//判断状态是否为0,如果是则说明已释放同步状态
            if (c == 0) {
                free = true;
            //设置Owner为null
                setExclusiveOwnerThread(null);
            }
      //设置更新同步状态
            setState(c);
            return free;
        }

unParkSucessor(h)的作用唤醒后续的节点:

 private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        if (s != null)
            LockSupport.unpark(s.thread); //唤醒
    }

ReentrantLock中的公平锁

与非公平锁不同的是,在获取锁的时候,公平锁的获取顺序是完全遵循FIFO规则,也就是说先请求的线程一定会先获取锁,后来的线程肯定需要排队.
下面比较一下公平锁和非公平锁lock方法:

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        // Android-removed: @ReservedStackAccess from OpenJDK 9, not available on Android.
        // @ReservedStackAccess
    //非公平锁:一上来二话不说直接CAS设置state抢锁
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

公平锁的lock方法:

 static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
        // Android-removed: @ReservedStackAccess from OpenJDK 9, not available on Android.
        // @ReservedStackAccess
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
          //先判断同步队列是否存在节点
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
    }

 public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

唯一的差别就是hasQueuedPredecessors()判断同步队列是否存在结点,这就是非公平锁与公平锁最大的区别,即公平锁在线程请求到来时先会判断同步队列是否存在结点,如果存在先执行同步队列中的结点线程,当前线程将封装成node加入同步队列等待。而非公平锁呢,当线程请求到来时,不管同步队列是否存在线程结点,直接尝试获取同步状态,获取成功直接访问共享资源,但请注意在绝大多数情况下,非公平锁才是我们理想的选择,毕竟从效率上来说非公平锁总是胜于公平锁。

总结:

以上便是ReentrantLock的内部实现原理,这里我们简单进行小结,重入锁ReentrantLock,是一个基于AQS并发框架的并发控制类,其内部实现了3个类,分别是Sync、NoFairSync以及FairSync类,其中Sync继承自AQS,实现了释放锁的模板方法tryRelease(int),而NoFairSync和FairSync都继承自Sync,实现各种获取锁的方法tryAcquire(int)

AQS在设计时将性能优化到了极致,具体体现在同步队列的park和unpark,初始化AQS时的懒加载,以及线程之间通过Node这样的数据结构从而避免线程间通讯造成的额外开销,这种由释放锁的线程主动唤醒后续线程的方式也是我们再实际过程中可以借鉴的.

相关文章

网友评论

      本文标题:ReentrantLock理解AQS同步队列的细节和设计模式

      本文链接:https://www.haomeiwen.com/subject/hmwvohtx.html