美文网首页
Java并发编程-条件等待与状态依赖

Java并发编程-条件等待与状态依赖

作者: lj72808up | 来源:发表于2019-10-28 19:13 被阅读0次

一. 依赖状态的类实现

1. 什么叫依赖状态的类

  • 如果类中的一些操作有基于状态的前提. 则这个类是依赖状态的
  • 类库中包含许多依赖状态的同步类, 比如FutureTask, Semaphore, BlockingQueue等.
    例如不能从一个空队列中删除元素, 或者不能获取一个尚未结束的任务的计算结果

2. 多线程下状态依赖 - 等待条件变为真

  • 单线程程序中, 条件分支依赖于某个状态, 如果这个条件不满足, 可以立刻失败, 因为该状态不会发生改变
  • 多线程程序中, 基于状态的条件可能会被其他线程修改, 所以不能立刻进入失败分支, 而是可以等待条件变为真

3. 如何进行高效的条件等待

    为了突出高效的条件等待方法 ,先来介绍一些低效的条件等待方法. 如下方法效率依次增高, 直到最后一种方法效率为最高:
一个统一的场景, 有界队列实现; 有界队列的put和take往往包含一个前提条件,不能从空队列中获取元素, 也不能将元素放入已满的队列之中. 当条件不满足时, 可以阻塞直到对象进入正确的状态, 或抛出异常或返回一个错误状态. 如下是所有有界队列实现的父类, 定义了公用的isFull()isEmpty()

aqs1.png
  • 方法一 : 将条件检测的失败状态传递给调用者(抛异常或返回一个错误值)
    该方法下, 如果条件依赖检查失败, 直接在方法中抛异常通知调用者. 这看起来实现简单, 但使用起来有2个问题

    1. 依赖不满足只是当下不满足, 而不是发生了异常
    2. 调用者每次调用必须做好捕获异常的准备, 且在异常发生时主动重试 aqs11.png
    3. 客户调用者捕获异常后, 可以有两种重试选择 :
      • 自行进入休眠
        catch块中进行Thread.sleep()
      while(true){
          try{
             V item = buffer.get();
             break;  // 未发生异常跳出循环
          }catch (Exception e){
             Thread.sleep(SLEEP_GRANULARITY)  // 自行休眠
          }
      }
      
      • 自旋等待(忙等待)
        catch中调用Thread.yield(), 当调度器给该线程一个时间片时, yield选择让出处理器而不是消耗完整个CPU时间片
      while(true){
          try{
             V item = buffer.get();
             break;  // 未发生异常跳出循环
          }catch (Exception e){
             Thread.yield();  // 自旋等待
          }
      }
      
  • 方法二: 通过轮训和休眠的方式, 勉强的解决状态依赖问题

    1. 该方法也是先进行条件检查, 然后执行方法; 但会在检查失败后自行重试
      首先, 状态条件的检查必须在锁的保护下进行.
      其次, 如果测试失败, 当前执行的线程应该释放锁并休眠一段时间, 让其他线程能够访问内部的buf; 当线程醒来后, 将重新请求锁并再次进行重试. 因而, 线程将反复的在条件检查,休眠,获取锁释放锁的等待过程中切换.

    2. 该方法实际将跑出异常那个方法中, 调用者的重试逻辑放到了方法内部进行; 使方法内部处理重试逻辑, 客户端只负责简单调用. aqs12.png
    3. 睡眠时间间隔选择
      要选择合适的睡眠时间, 需要在响应性和CPU使用率之间权衡. 睡眠时间小, 响应性就高, 但消耗的CPU资源也高

    4. 展望
      这种通过轮训和睡眠实现阻塞的过程需要进行巨大努力. 如果存在某种挂起线程的方法, 能够保证当依赖条件为真时才立即醒来, 则将极大地提高效率, 这正是条件队列实现的功能.

  • 方法三: 使用条件队列

    1. 条件队列:
      条件队列的名字来源于: 使"等待某个条件为真"的线程加入到一个队列中.
      一般队列中每个元素都是数据, 而条件队列中每个元素是一个线程
    2. 一个对象就是一个条件队列
      • 正如每个对象都可以是一个锁一样, 每个对象也可以作为一个条件队列; 对象的wait(), notify()构成了使用内部条件队列的API
      • 对象的内置锁和内置条件队列是相互关联的, 要调用对象X内置条件队列的任意一个方法, 必须先持有对象X的内置锁. 这也是因为 "等待由状态构成的条件" 和 "维护状态的一致性" 两种机制必须紧密地绑定在一起
      • wait():
        1. 会释放锁, 并请求操作系统挂起当前线程. 使其他线程能够获取这个锁并修改对象的状态.
        2. wait()醒来时, 将在返回之前重新获取锁.
        3. wait()通常写在while循环内部. 由于代码可能出现bug, 线程不一定是在条件满足后醒来, 还可能在条件处于任意状态下被错误唤醒, 因此线程醒来后需要重新判断依赖条件是否为true
      • 使用条件队列完成条件状态依赖, 和轮训休眠拥有相同的语义. 如果某个功能使用 "轮训和休眠" 无法完成, 那么使用条件队列也无法完成; aqs13.png

二. 使用条件队列的注意事项

条件队列使得构建高响应性对的状态依赖类变得容易, 但很容易造成误用; 因此使用时必须注意以下几点

2.1 条件谓词

  1. 什么是条件谓词

    • 条件谓词: 是使某个操作变为状态依赖操作的前提. 要想正确使用条件队列, 关键是找出对象在哪个条件谓词上等待
    • 有界队列中, 只有当缓存不为空的时候, take()才能执行, 否则必须等待; 因此, take()的条件谓词是"队列不为空"
      同样, put()只有在队列不满的情况下才能执行, 否则必须等待, 因此, put()的条件谓词是"队列不满"
  2. 怎么写条件谓词
    条件谓词是由类中各个状态变量构成的表达式

  3. 条件谓词如何实现条件等待

    • 条件等待中有一个重要的三元组关系: 加锁; wait()方法; 条件谓词
      • 锁加在状态变量上
      • 状态变量表达式构成了条件谓词
      • 一个条件谓词若判断为false, 则当前线程加入锁对应的条件队列并挂起
    • 因为条件谓词中包含多个状态变量, 而这些状态变量需要由同一个锁来保护, 所以在判断条件谓词为true或false之前必须先持有这个锁
    • 基于以上2点, 状态变量->条件谓词->条件等待队列->锁之间的关系克表示如下 aqs11122.png

2.2 过早唤醒

  1. 什么情况会引起过早唤醒(条件谓词不满足时就被唤醒)
    由于条件队列可以和多个条件谓词配合使用, 因此, 当一个挂起在wait()上的线程被notifyALL()唤醒时, 并不意味着该线程正在等待的条件谓词变成了true
    而且wait()方法还能执行有限时间的挂起, 超市后自动就会唤醒. 因此, 将条件谓词的检查放在while循环中是必要的的

2.3 丢失的信号

  1. wait()和notify()要有先后顺序?
    如果线程A在一个条件队列上执行了notify(), 而线程B随后才在这个条件队列上等待, 那么线程B将不会被线程A唤醒, 只能等待下一个notify()信号到来. 这就是线程A的信号丢失

2.4 通知

  1. 条件等待的必要操作: 通知

    1. 前面介绍的都是条件谓词的检查, 条件谓词不满足时线程加入条件队列进行等待; 还有另一半重要内容: 通知:
      • notify() : 从条件列中选择一个线程唤醒
      • notifyAll() : 把条件队列中所有的线程全部唤醒
    2. 发出通知的线程应该尽快释放锁
      线程调用notify()后, 虽然唤醒了其它线程进入锁的争抢, 但并未释放锁; 所以应该尽快释放锁, 让其它线程能够立马争取锁, 争取成功的线程会从wait()处的代码唤醒
    3. 上面的有界队列中, 如果队列为空, 则take()操作陷入等待, 当队列不为空时, take()操作返回; 因此必须确保每条会导致队列非空的操作都能发出一个通知, 通知take()可以返回了. 由于导致队列非空的操作时put(), 所以每次put()都要发起对条件队列的通知notifyAll()
  2. 使用notify()通知时危险的
    由于线程会因为不同的条件谓词不满足而进入同一个条件队列, 如果使用notify()通知而不是notifyAll()通知就会很危险, 因为单一的通知很容易导致类似于信号丢失的问题:

    比如, 线程A在条件队列中等待条件谓词PA, 线程B在条件队列中等待条件谓词PB, 此时线程C执行一个notify(), JVM选择线程A唤醒, 但线程A发现条件谓词并不满足进而进入挂起等待, 而其实线程C的操作可以让条件谓词PB满足, 但是线程B没能被唤醒, 造成了线程C的通知丢失
    
  3. 单次通知(notify())的严格条件
    通过以上分析, 只有下面2个条件同时满足时, 才可以使用notify()进行通知:

    • 只有1个条件谓词和条件队列相关,
    • 且条件变量上的每次通知最多只能唤醒一个线程

次时才能使用notify()发出通知

  1. 条件通知 - 使用条件优化notifyAll()通知
    有些开发人员认为"完全使用notifyAll()代替notify()"是低效的, 大多数情况下这种低效影响很小, 但有时候可能很大. 比如:
假设有10个线程在条件队列上等待, 当某个线程执行notifyAll()后, 10个线程全部唤醒, 并在锁上进行竞争. 然而, 他们中的大多数唤醒后仍发现条件谓词不满足而又重回到挂起, 这会产生大量的上下文切换和锁竞争

事实上, 条件等待版本的有界队列实现中, put()和take()的通知机制是保守的: 每当放入一个对象或从队列中取走一个对象就发起一次通知. 我们可以对其优化:
* 首先, 仅当队列从空变为非空时, 或者从满变为不满时才释放一个线程

* 并且, 仅当put()或take()影响到这些状态转换时才发出通知 aqs114.png

条件通知(条件判断下的notifyAll())和单次通知(notify)可以提升性能但却很难实现, 因此使用时应当谨慎, 只有在保证程序在notifyAll()下已经正常执行后, 再使用这个优化措施

三. 显式的Condition对象

Condition是一种广义的锁, 当然也是一种广义的条件队列; 有时使用内置锁过于灵活也可以使用广义锁

3.1 一个广义锁可以关联多个条件队列

  1. 内置锁的弊端和内置锁notifyAll()的局限
    内置条件队列的一个缺陷时, 每个内置锁只能和一个内置条件队列关联. 像上面的有界队列, 多个线程可能会在同一个条件队列上等待不同的条件谓词, 这使得无法满足"在调用notifyAll()时唤醒的的线程都是同一类型的需求". 如果要编写带有多个条件谓词的并发对象, 就可以使用显式的Lock和Condition, 而不是内置锁和条件队列

  2. ReentrantLock与Condition对象

    • 一个Condition和一个Lock关联在一起, 但是一个Lock可以和多个Condition关联在一起
    • 每个Condition代表一个条件队列, Conditiopn对象会继承Lock对象的公平性: 对于公平锁, 线程会依照FIFO的顺序从Condition.await()中被释放
    Condition对象的等待和通知方法分别是: await(),signal()和signalAll(). 但是因为Condition扩展了Object, 所以它也有wait()和notify()方法. 使用时一定要调用正确的方法 - await()和signal()
    
  3. Condition实现的有界队列

    • 因为有界队列有2个谓词, "非空"和"非满", 所以可以用2个条件谓词(2个Condition对象): notFull和notEmpty;
      当队列为空时, take()挂起并等待notEmpty, 此时put()向notEmpty发送信号, 解除阻塞在take()中的线程

      aqs21.png
    • 使用多个Condition时, 比分析一个使用单一内部队列加多个条件谓词的类简单的多. 通过把条件谓词分开并放入到2个等待线程集合中, Condition更容易满足单次通知的需求. signal()比signalAll()更高效, 它能极大的减少每次唤醒操作产生的上写文切换和锁请求次数

3.2 RentrantLock和Semaphore的共性

RentrantLockSemaphore有很多共性:

  • 它们都可以作为一个阀门, 每次只允许一定数量的线程通过, 并且当线程到达阀门时可以有不同状态:
    • 可以通过(调用lock()acquire()时成功返回)
    • 可以等待(调用lock()acquire()时阻塞)
    • 可以取消(调用tryLock(timeout)tryAcquire(timeout)时返回false, 表示指定时间内锁时不可用或者无法获得许可的)
  • 两个接口都支持可中断, 不可中断, 限时的获取操作
  • 也都支持线程执行公平或非公平等待

虽然他们不是利用对方实现的, 但仍可以通过锁实现信号量.事实上他们在实现时使用了一个共同的基类AbstractQueuedSynchronizer

四. 如何使用AbstractQueuedSynchronizer构建同步类

1. 获取与释放:

基于AbstractQueuedSynchronizer构建的同步类, 基本操作包括各种形式的获取操作释放操作

  • 获取操作: 是一种依赖状态的类, 通常会阻塞. 使用锁或信号量时, "获取"操作意味着获取的是锁或许可, 并且调用者可能会一直等待直到同步器处于可被获取的状态
  • 释放操作: 释放并不是一个可阻塞的操作, 当执行"释放"时, 所有请求时被阻塞的线程都会执行

2. AQS负责管理同步类中的状态

  • 一个类如果想成为一个依赖状态的类, 就必须拥有一些状态. AQS就负责管理同步类中的状态.
  • AQS管理一个整数状态的信息, 这个状态可以使用getState,setState以及compareAndSet等protected方法进行操作
    例如, ReentrantLock用状态表示所有者线程已经重复获取该锁的次数
        Semaphore用状态表示剩余的许可数量
    
    此外, 同步类还可以自己保存一些额外的状态变量.
    例如, ReentrantLock还保存了所有者线程的信息, 这样就能区分出某个获取动作是重入的还是竞争的
    

3, 利用AQS实现获取操作

  1. 一个获取操作包含两部分:

    • 首先, 同步器判断的那个钱状态是否允许获取操作
      • 如果是, 则允许线程执行
      • 如果不是, 获取操偶作将阻塞或者失败
    例如, 对于锁来说, 如果他没有被某个线程持有, 则能被成功获取
    
    • 其次, 更新同步器的状态
      获取同步器的某个线程可能会对其他线程能否获得同步器造成影响
    例如, 当获取一个锁后, 锁的状态从"未被持有"状变为"已被持有"
        从Semophore获取一个许可后, 把剩余许可数量减1 
    
  2. 如何通过AQS实现获取

    • 如果某个同步类支持独占的获取操作, 则应该实现一些方法:
      • tryAcquire()
      • tryRelease()
      • isHeldExclusively
    • 如果是支持共享获取的同步类, 则应实现:
      • tryAcquireShared()
      • tryReleaseShared()等方法

4. ReentrantLock中的AQS同步器

  • 因为ReentrantLock只支持独占方式的偶去操作, 因此实现了tryAcquiretryRelease()
  • SyncReentrantLock的内部同步器, 实现了tryRelease(); tryAcquire根据公平和非公平性的不同在Sync的子类FairSyncNonfairSync中实现
abstract static class Sync extends AbstractQueuedSynchronizer {
      ...
      protected final boolean tryRelease(int releases) {
            int c = getState() - releases;  // 所有者线程重复获取锁的次数
            // 如果释放锁的线程不是锁的持有者线程, 抛异常 (释放操作的第一步: 是否允许释放锁)
            if (Thread.currentThread() != getExclusiveOwnerThread())  
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);  // 释放锁的第二步: 释放同步器的状态
            return free;
        }  
       ...
}
static final class FairSync extends Sync {
      ...
      protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState();
        if (c == 0) {  // 没有线程在持有锁
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        // 如果持有锁的线程是当前线程
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        // 持有锁的线程不是当前线程
        return false;
      }
     ...
}

相关文章

网友评论

      本文标题:Java并发编程-条件等待与状态依赖

      本文链接:https://www.haomeiwen.com/subject/pptivctx.html