美文网首页Java架构技术进阶
并发编程-深入浅出AQS

并发编程-深入浅出AQS

作者: can_4999 | 来源:发表于2019-11-20 22:07 被阅读0次

    AQS是并发编程中非常重要的概念,它是juc包下的许多并发工具类,如CountdownLatch,CyclicBarrier,Semaphore 和锁, 如ReentrantLock, ReaderWriterLock的实现基础,提供了一个基于int状态码和队列来实现的并发框架。本文将对AQS框架的几个重要组成进行简要介绍,读完本文你将get到以下几个点:

    1. AQS进行并发控制的机制是什么
    2. AQS独占和共享模式是如何实现的
    3. 同步队列和条件等待队列的区别,和数据出入队原则

    一,AQS基本概念

    AQS(AbstractQueuedSynchronizer)是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量来表示状态,通过内置的FIFO(first in,first out)队列来完成资源获取线程的排队工作。

    队列可分为两种,一种是同步队列,是程序执行入口出处的等待队列;而另一种则是条件等待队列,队列中的元素是在程序执行时在某个条件上发生等待。

    并发编程-深入浅出AQS

    1.1 独占or共享模式

    AQS支持两种获取同步状态的模式既独占式和共享式。顾名思义,独占式模式同一时刻只允许一个线程获取同步状态,而共享模式则允许多个线程同时获取。

    并发编程-深入浅出AQS

    1.2 同步队列

    当一个线程尝试获取同步状态失败时,同步器会将这个线程以及等待状态等信息构造成一个节点加入到等待队列中,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试重复获取同步队列。

    并发编程-深入浅出AQS

    1.3 条件队列

    AQS内部类ConditionObject来实现的条件队列,当一个线程获取到同步状态,但是却通过Condition调用了await相关的方法时,会将该线程封装成Node节点并加入到条件队列中,它的结构和同步队列相同。

    二,独占or共享模式

    AQS框架中,通过维护一个int类型的状态,来进行并发控制,线程通常通过修改此状态信息来表明当前线程持有此同步状态。AQS则是通过保存修改状态线程的引用来实现独占和共享模式的。

    <pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">/**

    • 获取同步状态
      /
      public final void acquire(int arg) {
      //尝试获取同步状态, 如果尝试获取到同步状态失败,则加入到同步队列中
      if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
      }
      /
      *
    • 尝试获取同步状态【子类中实现】,因为aqs基于模板模式,仅提供基于状态和同步队列的实
    • 现思路,具体的实现由子类决定
      */
      protected final boolean tryAcquire(int acquires) {
      final Thread current = Thread.currentThread();
      int c = getState();
      if (c == 0) {
      // 如果当前状态值为0,并且等待队列中没有元素,执行修改状态值操作
      if (!hasQueuedPredecessors() &&
      compareAndSetState(0, acquires)) {
      // 修改状态值成功,记录当前持有同步状态的线程信息
      setExclusiveOwnerThread(current);
      return true;
      }
      // 如果当前线程已经持有同步状态,继续修改同步状态【重入锁实现原理】
      } else if (current == getExclusiveOwnerThread()) {
      int nextc = c + acquires;
      if (nextc < 0)
      throw new Error("Maximum lock count exceeded");
      setState(nextc);
      return true;
      }
      return false;
      }

    /**

    • 根据传入的模式以及当前线程信息创建一个队列的节点并加入到同步队列尾部
      /
      private Node addWaiter(Node mode) {
      Node node = new Node(Thread.currentThread(), mode);
      // Try the fast path of enq; backup to full enq on failure
      Node pred = tail;
      if (pred != null) {
      node.prev = pred;
      if (compareAndSetTail(pred, node)) {
      pred.next = node;
      return node;
      }
      }
      enq(node);
      return node;
      }
      /
      *
    • 同步队列中节点,尝试获取同步状态
      */
      final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
      boolean interrupted = false;
      // 自旋(死循环)
      for (;;) {
      // 只有当前节点的前驱节点是头节点时才会尝试执行获取同步状态操作
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
      setHead(node);
      p.next = null; // help GC
      failed = false;
      return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }</pre>

    独占式是如何控制得?

    当修改状态信息成功后,如果执行的是独占式操作,AQS的具体实现类中会保存当前线程的信息来声明同步状态已被当前线程占用,此时其他线程再尝试获取同步状态会返回false。

    三,同步队列

    3.1 队列中保存那些信息?

    同步队列节点中主要保存着线程的信息以及模式(共享or独占)。

    3.2 何时执行入队操作?

    <pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">/**

    • 获取同步状态
      */
      public final void acquire(int arg) {
      //尝试获取同步状态, 如果尝试获取到同步状态失败,则加入到同步队列中
      if (!tryAcquire(arg) &&
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      selfInterrupt();
      }</pre>

    复用上文中的代码,不难看出再获取同步状态失败后,会执行入队操作。

    并发编程-深入浅出AQS

    3.3 何时执行出队操作?

    当线程获取同步状态失败时,会被封装成Node节点加入到等待队列中,此时所有节点都回进入自旋过程,首先判断自己prev是否时头节点,如果是则尝试获取同步状态。
    被阻塞线程的唤醒主要以靠前驱节点的出队或阻塞线程被中断来实现。

    <pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">/**

    • 同步队列中节点,尝试获取同步状态
    • 1. 当一个线程获取到同步状态时,会将当前线程构造程Node并设置为头节点
    • 2. 并将原始的head节点设置为null,以便于垃圾回收
      */
      final boolean acquireQueued(final Node node, int arg) {
      boolean failed = true;
      try {
      boolean interrupted = false;
      for (;;) {
      final Node p = node.predecessor();
      if (p == head && tryAcquire(arg)) {
      setHead(node);
      p.next = null; // help GC
      failed = false;
      return interrupted;
      }
      if (shouldParkAfterFailedAcquire(p, node) &&
      parkAndCheckInterrupt())
      interrupted = true;
      }
      } finally {
      if (failed)
      cancelAcquire(node);
      }
      }</pre>

    四,条件等待队列

    条件变量(ConidtionObject)是AQS中的一个内部类,用来实现同步队列机制。同步队列复用了等待队列中Node节点,所以同步队列到等待队列中不需要进行额外的转换。

    4.1 什么时候执行入队操作?

    当线程获取到同步状态,但是在临界区中调用了await()方法,此时该线程会被加入到对应的条件队列汇总。
    ps: 临界区,加锁和释放锁之间的代码区域

    <pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">/**

    • ConditionObject中的await方法,调用后使得当前执行线程加入条件等待队列
      /
      public final void await() throws InterruptedException {
      if (Thread.interrupted())
      throw new InterruptedException();
      Node node = addConditionWaiter();
      // -----省略代码------
      }
      /
      *
    • 添加等待线程
      */
      private Node addConditionWaiter() {
      Node t = lastWaiter;
      // -----省略代码------
      // 将当前线程构造程条件队列节点,并加入到队列中
      Node node = new Node(Thread.currentThread(), Node.CONDITION);
      if (t == null)
      firstWaiter = node;
      else
      t.nextWaiter = node;
      lastWaiter = node;
      return node;
      }</pre>

    4.2 什么时候执行出队操作?

    当对应的Conditioni调用signial/signalAll()方法时回选择从条件队列中出队列,同步队列是通过自旋的方式获取同步状态,而条件队列中的节点则通过通知的方式出队。条件队列中的节点被唤醒后会加入到入口等待队列中。

    <pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">/**

    • 唤醒当前条件等到队列中的所有等待线程
      /
      public final void signalAll() {
      if (!isHeldExclusively())
      throw new IllegalMonitorStateException();
      Node first = firstWaiter;
      if (first != null)
      doSignalAll(first);
      }
      /
      *
    • 遍历队列,将元素从条件队列 加入到 同步队列
      */
      private void doSignalAll(Node first) {
      lastWaiter = firstWaiter = null;
      do {
      Node next = first.nextWaiter;
      first.nextWaiter = null;
      transferForSignal(first);
      first = next;
      } while (first != null);
      }
      final boolean transferForSignal(Node node) {
      // -----省略代码------
      // 执行入队操作,将node添加到同步队列中
      Node p = enq(node);
      int ws = p.waitStatus;
      if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
      LockSupport.unpark(node.thread);
      return true;
      }</pre>

    五,总结

    1. 使用Node实现的FIFO队列,可以用于构建锁或者其他同步装置的基础框架
    2. 利用一个int类型的属性表示状态
    3. 使用模板方法模式,子类可以通过继承它来管理状态实现各种并发工具
    4. 可以同时实现独占和共享模式

    本文对AQS的基本原理进行的简要的描述,对于子类的公平性和非公平行实现,中断,队列中节点的等待状态,cas等操作没有进行探讨,感兴趣的小伙伴可以进行源码阅读或者查阅相关资料。

    六,Q&A

    Question1: 在java中通常使用synchronized来实现方法同步,AQS中通过CAS保证了修改同步状态的一致性问题,那么对比synchronized,cas有什么优势不同与优势呢?你还知道其他无锁并发的策略吗?

    相关文章

      网友评论

        本文标题:并发编程-深入浅出AQS

        本文链接:https://www.haomeiwen.com/subject/payaictx.html