美文网首页
读《Java并发编程》小结

读《Java并发编程》小结

作者: WAHAHA402 | 来源:发表于2021-11-27 16:24 被阅读0次

    戳我的笔记链接地址
    本文是对《Java并发编程》专栏的读后小结,跟大家分享。

    目录

    1、bug的源头-三个属性
    2、Java内存模型
    3、死锁的解决方案
    死锁发生的条件
    死锁的预防
    4、等待-通知机制
    wait的使用范式
    wait和sleep的区别
    5、线程的生命周期
    通用的线程生命周期(五态模型)
    Java中线程的生命周期
    状态转换
    6、创建合理的线程数量
    CPU密集型应用
    I/O密集型应用
    7、Lock与synchronized的不同
    用两个条件变量实现阻塞队列
    异步转同步
    8、用Semaphore实现一个限流器
    9、读写锁 ReadWriteLock
    读写锁升级问题
    9、StampedLock 比读写锁更快的锁
    10、CountDownLatch 和 CyclicBarrier 让多线程步调一致
    11、Java并发容器
    注意事项
    12、原子类
    ABA问题
    原子类组成概览
    13、Java中的线程池
    // todo 线程的运行过程
    ThreadPoolExecutor 线程池参数
    拒绝策略
    使用线程池的注意事项
    14、Future 获取异步执行结果
    如何获取异步任务执行结果
    FutureTask工具类
    14、CompletableFuture 异步编程
    15、CompletionService 批量执行异步任务
    16、Fork/Join 并行计算框架
    模拟MapReduce统计单词数量

    1、bug的源头-三个属性

    可见性、有序性、原子性。
    我们的 CPU、内存、I/O 设备都在不断迭代,不断朝着更快的方向努力。但是,在这个快速发展的过程中,有一个核心矛盾一直存在,就是这三者的速度差异。
    cpu寄存器缓存导致的可见性问题。
    线程切换带来原子性问题。
    编译优化带来有序性问题。
    其中,在 Java 领域一个经典的案例就是利用双重检查创建单例对象:

    public class Singleton {
      static Singleton instance;
      static Singleton getInstance(){
        if (instance == null) {
          synchronized(Singleton.class) {
            if (instance == null)
              instance = new Singleton(); //在这一步如果编译优化
            }
        }
        return instance;
      }
    }
    

    第八行,如果发生编译优化:我们以为的 new 操作应该是:
    分配一块内存 M;
    在内存 M 上初始化 Singleton 对象;
    然后 M 的地址赋值给 instance 变量。
    但是实际上优化后的执行路径却是这样的:
    分配一块内存 M;
    将 M 的地址赋值给 instance 变量;
    最后在内存 M 上初始化 Singleton 对象。
    如果在第2步发生线程a到线程b的切换,线程b直接返回instance,这个时候调用没有初始化过的instance对象,会产生空指针异常。

    解决办法:对instance对象加volatile语义申明。

    2、Java内存模型

    Java 内存模型规范了 JVM 如何提供按需禁用缓存和编译优化的方法。具体来说,这些方法包括 volatile、synchronized 和 final 三个关键字,以及七项 Happens-Before 规则。

    3、死锁的解决方案

    使用细粒度锁可以提高并行度,是性能优化的一个重要手段。但是有时细粒度的锁容易导致死锁。死锁的一个比较专业的定义是:一组互相竞争资源的线程因互相等待,导致“永久”阻塞的现象。
    死锁发生的条件
    互斥,共享资源 X 和 Y 只能被一个线程占用;
    占有且等待,线程 T1 已经取得共享资源 X,在等待共享资源 Y 的时候,不释放共享资源 X;
    不可抢占,其他线程不能强行抢占线程 T1 占有的资源;
    循环等待,线程 T1 等待线程 T2 占有的资源,线程 T2 等待线程 T1 占有的资源,就是循环等待。
    死锁的预防
    反过来分析,也就是说只要我们破坏其中一个,就可以成功避免死锁的发生。互斥就是锁的目的,所以无法预防。
    破坏占有且等待,可以一次性申请所有资源。要么全部获取成功,要么全部获取失败。
    破坏不可抢占,核心是要能够主动释放它占有的资源,这一点 synchronized 是做不到的。java.util.concurrent 这个包下面提供的 Lock 是可以轻松解决这个问题的。提供tryLock(long, TimeUnit) 方法,在一段时间后放弃获取锁。
    破坏循环等待条件,破坏这个条件,需要对资源进行排序,然后按序申请资源。

    4、等待-通知机制

    用 synchronized 实现等待 - 通知机制在 Java 语言里,等待 - 通知机制可以有多种实现方式,比如 Java 语言内置的 synchronized 配合 wait()、notify()、notifyAll() 这三个方法就能轻松实现。

    如何用 synchronized 实现互斥锁,你应该已经很熟悉了。在下面这个图里,左边有一个等待队列,同一时刻,只允许一个线程进入 synchronized 保护的临界区(这个临界区可以看作大夫的诊室),当有一个线程进入临界区后,其他线程就只能进入图中左边的等待队列里等待(相当于患者分诊等待)。这个等待队列和互斥锁是一对一的关系,每个互斥锁都有自己独立的等待队列。
    wait()工作原理图

    notify()工作原理图

    上面我们一直强调 wait()、notify()、notifyAll() 方法操作的等待队列是互斥锁的等待队列,所以如果 synchronized 锁定的是 this,那么对应的一定是 this.wait()、this.notify()、this.notifyAll();如果 synchronized 锁定的是 target,那么对应的一定是 target.wait()、target.notify()、target.notifyAll() 。
    而且 wait()、notify()、notifyAll() 这三个方法能够被调用的前提是已经获取了相应的互斥锁,所以我们会发现 wait()、notify()、notifyAll() 都是在 synchronized{}内部被调用的。如果在 synchronized{}外部调用,或者锁定的 this,而用 target.wait() 调用的话,JVM 会抛出一个运行时异常:java.lang.IllegalMonitorStateException。

    // wait的使用范式
      while(条件不满足) {
        wait();
      }
    

    除非经过深思熟虑,否则尽量使用 notifyAll(),只用notify()可能会导致有线程永远得不到执行。

    wait和sleep的区别
    wait与sleep区别在于: 1. wait会释放所有锁而sleep不会释放锁资源. 2. wait只能在同步方法和同步块中使用,而sleep任何地方都可以. 3. wait无需捕捉异常,而sleep需要. 两者相同点:都会让渡CPU执行时间,等待再次调度! wait()方法与sleep()方法的不同之处在于,wait()方法会释放对象的“锁标志”。当调用某一对象的wait()方法后,会使当前线程暂停执行,并将当前线程放入对象等待池中,直到调用了notify()方法后,将从对象等待池中移出任意一个线程并放入锁标志等待池中,只有锁标志等待池中的线程可以获取锁标志,它们随时准备争夺锁的拥有权。当调用了某个对象的notifyAll()方法,会将对象等待池中的所有线程都移动到该对象的锁标志等待池。 sleep()方法需要指定等待的时间,它可以让当前正在执行的线程在指定的时间内暂停执行,进入阻塞状态,该方法既可以让其他同优先级或者高优先级的线程得到执行的机会,也可以让低优先级的线程得到执行机会。但是sleep()方法不会释放“锁标志”,也就是说如果有synchronized同步块,其他线程仍然不能访问共享数据。

    5、线程的生命周期

    通用的线程生命周期(五态模型)

    初始状态,指的是线程已经被创建,但是还不允许分配 CPU 执行。这个状态属于编程语言特有的,在操作系统层面,真正的线程还没有创建。
    可运行状态,指的是线程可以分配 CPU 执行。在这种状态下,真正的操作系统线程已经被成功创建了,所以可以分配 CPU 执行。
    当有空闲的 CPU 时,操作系统会将其分配给一个处于可运行状态的线程,被分配到 CPU 的线程的状态就转换成了运行状态。
    运行状态的线程如果调用一个阻塞的 API(例如以阻塞方式读文件)或者等待某个事件(例如条件变量),那么线程的状态就会转换到休眠状态,同时释放 CPU 使用权,休眠状态的线程永远没有机会获得 CPU 使用权。当等待的事件出现了,线程就会从休眠状态转换到可运行状态。
    线程执行完或者出现异常就会进入终止状态,终止状态的线程不会切换到其他任何状态,进入终止状态也就意味着线程的生命周期结束了。

    Java中线程的生命周期

    这五种状态在不同编程语言里会有简化合并。
    Java 语言里则把可运行状态和运行状态合并了(变成了运行状态),这两个状态在操作系统调度层面有用,而 JVM 层面不关心这两个状态,因为 JVM 把线程调度交给操作系统处理了。除了简化合并,这五种状态也有可能被细化,比如,Java 语言里就细化了休眠状态(Blocked、Waiting、Timed_Waiting).

    Java 语言中线程共有六种状态,分别是:
    NEW(初始化状态)
    RUNNABLE(可运行 / 运行状态)
    BLOCKED(阻塞状态)
    WAITING(无时限等待)
    TIMED_WAITING(有时限等待)
    TERMINATED(终止状态)

    状态转换

    1. RUNNABLE 与 BLOCKED 的状态转换
      只有一种场景会触发这种转换,就是线程等待 synchronized 的隐式锁。

    2. RUNNABLE 与 WAITING 的状态转换
      第一种场景,获得 synchronized 隐式锁的线程,调用无参数的 Object.wait() 方法。
      第二种场景,调用无参数的 Thread.join() 方法。其中的 join() 是一种线程同步方法,例如有一个线程对象 thread A,当调用 A.join() 的时候,执行这条语句的线程会等待 thread A 执行完,而等待中的这个线程,其状态会从 RUNNABLE 转换到 WAITING。当线程 thread A 执行完,原来等待它的线程又会从 WAITING 状态转换到 RUNNABLE。
      第三种场景,调用 LockSupport.park() 方法。其中的 LockSupport 对象,也许你有点陌生,其实 Java 并发包中的锁,都是基于它实现的。调用 LockSupport.park() 方法,当前线程会阻塞,线程的状态会从 RUNNABLE 转换到 WAITING。调用 LockSupport.unpark(Thread thread) 可唤醒目标线程,目标线程的状态又会从 WAITING 状态转换到 RUNNABLE。

    3. RUNNABLE 与 TIMED_WAITING 的状态转换
      有五种场景会触发这种转换:
      调用带超时参数的 Thread.sleep(long millis) 方法;
      获得 synchronized 隐式锁的线程,调用带超时参数的 Object.wait(long timeout) 方法;
      调用带超时参数的 Thread.join(long millis) 方法;
      调用带超时参数的 LockSupport.parkNanos(Object blocker, long deadline) 方法;
      调用带超时参数的 LockSupport.parkUntil(long deadline) 方法。
      这里你会发现 TIMED_WAITING 和 WAITING 状态的区别,仅仅是触发条件多了超时参数。

    4. 从 NEW 到 RUNNABLE 状态
      从 NEW 状态转换到 RUNNABLE 状态很简单,只要调用线程对象的 start() 方法就可以了

    5. 从 RUNNABLE 到 TERMINATED 状态
      线程执行完 run() 方法后,会自动转换到 TERMINATED 状态,当然如果执行 run() 方法的时候异常抛出,也会导致线程终止。有时候我们需要强制中断 run() 方法的执行,例如 run() 方法访问一个很慢的网络,我们等不下去了,想终止怎么办呢?Java 的 Thread 类里面倒是有个 stop() 方法,不过已经标记为 @Deprecated,所以不建议使用了。正确的姿势其实是调用 interrupt() 方法。

    stop() 方法会真的杀死线程,不给线程喘息的机会,如果线程持有 ReentrantLock 锁,被 stop() 的线程并不会自动调用 ReentrantLock 的 unlock() 去释放锁,那其他线程就再也没机会获得 ReentrantLock 锁。
    interrupt() 方法仅仅是通知线程,线程有机会执行一些后续操作,同时也可以无视这个通知。被 interrupt 的线程,是怎么收到通知的呢?一种是异常,另一种是主动检测。
    当线程 A 处于 WAITING、TIMED_WAITING 状态时,如果其他线程调用线程 A 的 interrupt() 方法,会使线程 A 返回到 RUNNABLE 状态,同时线程 A 的代码会触发 InterruptedException 异常。上面我们提到转换到 WAITING、TIMED_WAITING 状态的触发条件,都是调用了类似 wait()、join()、sleep() 这样的方法,我们看这些方法的签名,发现都会 throws InterruptedException 这个异常。这个异常的触发条件就是:其他线程调用了该线程的 interrupt() 方法。

    6、创建合理的线程数量

    创建合理的线程数量,目的是将硬件的性能发挥到极致。根据不同的应用场景,我们将应用分为两种场景论述,I/O密集型应用和CPU密集型应用。
    CPU密集型应用
    对于 CPU 密集型的计算场景,理论上“线程的数量 =CPU 核数”就是最合适的。不过在工程上,线程的数量一般会设置为“CPU 核数 +1”,这样的话,当线程因为偶尔的内存页失效或其他原因导致阻塞时,这个额外的线程可以顶上,从而保证 CPU 的利用率。
    I/O密集型应用
    对于 I/O 密集型计算场景,最佳的线程数是与程序中 CPU 计算和 I/O 操作的耗时比相关的,我们可以总结出这样一个公式:
    最佳线程数 =1 +(I/O 耗时 / CPU 耗时)
    不过上面这个公式是针对单核 CPU 的,至于多核 CPU,也很简单,只需要等比扩大就可以了,计算公式如下:
    最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)]

    Q: 有些同学对于最佳线程数的设置积累了一些经验值,认为对于 I/O 密集型应用,最佳线程数应该为:2 * CPU 的核数 + 1,你觉得这个经验值合理吗?
    A: 工作中都是按照逻辑核数来的,理论值和经验值只是提供个指导,实际上还是要靠压测!!!

    7、Lock与synchronized的不同

    synchronized存在,Javasdk还造一个Lock的原因主要是为了弥补synchronized,会阻塞线程且不会释放已经占有的资源的问题,lock的三个方法如下:

    // 支持响应中断
    void lockInterruptibly() 
      throws InterruptedException;
    // 支持超时
    boolean tryLock(long time, TimeUnit unit) 
      throws InterruptedException;
    // 支持非阻塞获取锁
    boolean tryLock();
    

    并且,Lock支持多个条件变量。Lock用来实现“互斥”,condition用来实现“同步”。

    用两个条件变量实现阻塞队列

    public class BlockedQueue<T>{
      final Lock lock =
        new ReentrantLock();
      // 条件变量:队列不满  
      final Condition notFull =
        lock.newCondition();
      // 条件变量:队列不空  
      final Condition notEmpty =
        lock.newCondition();
    
      // 入队
      void enq(T x) {
        lock.lock();
        try {
          while (队列已满){
            // 等待队列不满
            notFull.await();
          }  
          // 省略入队操作...
          //入队后,通知可出队
          notEmpty.signal();
        }finally {
          lock.unlock();
        }
      }
      // 出队
      void deq(){
        lock.lock();
        try {
          while (队列已空){
            // 等待队列不空
            notEmpty.await();
          }  
          // 省略出队操作...
          //出队后,通知可入队
          notFull.signal();
        }finally {
          lock.unlock();
        }  
      }
    }
    

    不过,这里你需要注意,Lock 和 Condition 实现的管程,线程等待和通知需要调用 await()、signal()、signalAll(),它们的语义和 wait()、notify()、notifyAll() 是相同的。但是不一样的是,Lock&Condition 实现的管程里只能使用前面的 await()、signal()、signalAll(),而后面的 wait()、notify()、notifyAll() 只有在 synchronized 实现的管程里才能使用。如果一不小心在 Lock&Condition 实现的管程里调用了 wait()、notify()、notifyAll(),那程序可就彻底玩儿完了。

    异步转同步
    远程调用rpc请求时,面临异步转同步的问题,因为tcp层面上rpc请求就是异步的,它不会等待请求返回结果,所以类似于dubbo这种rpc框架也是做了异步转同步的工作,具体实现类似于上面“用两个条件变量实现阻塞队列”的代码。

    8、用Semaphore实现一个限流器

    极客专栏链接 https://time.geekbang.org/column/article/88499
    Semaphore是信号量的意思,可以允许多个线程访问一个临界区。可以用来实现比较常见的需求就是我们工作中遇到的各种池化资源,例如连接池、对象池、线程池等等。

    9、读写锁 ReadWriteLock

    极客专栏链接(质量很高,也很实用) https://time.geekbang.org/column/article/88909
    这个链接里文章实现了一个简易的完备缓存的示例。
    读写锁升级问题
    读锁不能升级为写锁。
    写锁可以降级为读锁。

    如果进行读写锁升级,读锁还没有释放,此时获取写锁,会导致写锁永久等待,最终导致相关线程都被阻塞,永远也没有机会被唤醒。锁的升级是不允许的,这个你一定要注意。

    9、StampedLock 比读写锁更快的锁

    极客专栏链接 使用stampedLock有几个比较重要需要注意的点,所以谨慎使用。
    支持三种模式
    写锁
    悲观读锁(类似与读写锁的读锁)
    乐观读(无锁,检测到有锁的时候需要转成悲观读锁)

    10、CountDownLatch 和 CyclicBarrier 让多线程步调一致

    极客时间专栏链接 https://time.geekbang.org/column/article/89461
    CountDownLatch 和 CyclicBarrier 是 Java 并发包提供的两个非常易用的线程同步工具类,这两个工具类用法的区别在这里还是有必要再强调一下:
    CountDownLatch 主要用来解决一个线程等待多个线程的场景,可以类比旅游团团长要等待所有的游客到齐才能去下一个景点;
    而 CyclicBarrier 是一组线程之间互相等待,更像是几个驴友之间不离不弃。
    除此之外 CountDownLatch 的计数器是不能循环利用的,也就是说一旦计数器减到 0,再有线程调用 await(),该线程会直接通过。但 CyclicBarrier 的计数器是可以循环利用的,而且具备自动重置的功能,一旦计数器减到 0 会自动重置到你设置的初始值。除此之外,CyclicBarrier 还可以设置回调函数,可以说是功能丰富。

    11、Java并发容器

    通过对操作方法加synchronized关键字,可以使得一个容器变成线程安全的容器。这类容器称为同步容器,针对同步容器的性能问题,Java在1.5版本之后出来了并发容器,性能更高。
    同步容器:
    常见的有Vector、Stack 和 Hashtable等,通过对操作方法加synchronized关键字实现。
    并发容器:
    数量较多,主要有以下四大类:Set、Map、Set、Queue
    并发容器关系图:
    比较熟悉的有ConcurrentHashMap、BlockingQueue....

    注意事项
    1、在容器领域一个容易被忽视的“坑”是用迭代器遍历容器。

    // 有问题的写法
    List list = Collections.
      synchronizedList(new ArrayList());
    Iterator i = list.iterator(); 
    while (i.hasNext())
      foo(i.next());
      
    // 正确的写法
    // 因为是对list的操作,如果list变化了,会使得迭代器报错 
    // 所以先把list锁住,保证迭代器运行期间list不会变化 
    // 这也是在迭代器里对当前元素删除会报错的原因
    
    List list = Collections.
      synchronizedList(new ArrayList());
    Iterator i = list.iterator(); 
    while (i.hasNext())
      foo(i.next());
    

    2、另外,使用队列时,需要格外注意队列是否支持有界(所谓有界指的是内部的队列是否有容量限制)。实际工作中,一般都不建议使用无界的队列,因为数据量大了之后很容易导致 OOM。上面我们提到的这些 Queue 中,只有 ArrayBlockingQueue 和 LinkedBlockingQueue 是支持有界的,所以在使用其他无界队列时,一定要充分考虑是否存在导致 OOM 的隐患。

    12、原子类

    极客时间专栏 https://time.geekbang.org/column/article/90515
    原子类高性能的秘密就是硬件支持,基于CPU提供的cas(compare and swap)指令。
    ABA问题
    aba问题,指的是一个共享变量经历了A--》B--》A的一个过程,虽然最终值一样,但是已经被更新过了。使用原子化的更新对象很可能就需要关心 ABA 问题,因为两个 A 虽然相等,但是第二个 A 的属性可能已经发生变化了。
    相关实现有 AtomicReference、AtomicStampedReference 和 AtomicMarkableReference,利用它们可以实现对象引用的原子化更新。AtomicReference 提供的方法和原子化的基本数据类型差不多,这里不再赘述。不过需要注意的是,对象引用的更新需要重点关注 ABA 问题,AtomicStampedReference 和 AtomicMarkableReference 这两个原子类可以解决 ABA 问题。

    原子类组成概览

    13、Java中的线程池

    美团技术文章Java线程池
    https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

    使用线程池的目的是为了避免线程的频繁创建和销毁。线程是一个重量级的对象,应该避免频繁创建和销毁。
    线程池是一种生产者 - 消费者模式。线程池的使用方是生产者,线程池本身是消费者。在下面的示例代码中,我们创建了一个非常简单的线程池 MyThreadPool,你可以通过它来理解线程池的工作原理。

    //简化的线程池,仅用来说明工作原理
    class MyThreadPool{
      //利用阻塞队列实现生产者-消费者模式
      BlockingQueue<Runnable> workQueue;
      //保存内部工作线程
      List<WorkerThread> threads 
        = new ArrayList<>();
      // 构造方法
      MyThreadPool(int poolSize, 
        BlockingQueue<Runnable> workQueue){
        this.workQueue = workQueue;
        // 创建工作线程
        for(int idx=0; idx<poolSize; idx++){
          WorkerThread work = new WorkerThread();
          work.start();
          threads.add(work);
        }
      }
      // 提交任务
      void execute(Runnable command){
        workQueue.put(command);
      }
      // 工作线程负责消费任务,并执行任务
      class WorkerThread extends Thread{
        public void run() {
          //循环取任务并执行
          while(true){ ①
            Runnable task = workQueue.take();
            task.run();
          } 
        }
      }  
    }
    
    /** 下面是使用示例 **/
    // 创建有界阻塞队列
    BlockingQueue<Runnable> workQueue = 
      new LinkedBlockingQueue<>(2);
    // 创建线程池  
    MyThreadPool pool = new MyThreadPool(
      10, workQueue);
    // 提交任务  
    pool.execute(()->{
        System.out.println("hello");
    });
    

    在 MyThreadPool 的内部,我们维护了一个阻塞队列 workQueue 和一组工作线程,工作线程的个数由构造函数中的 poolSize 来指定。用户通过调用 execute() 方法来提交 Runnable 任务,execute() 方法的内部实现仅仅是将任务加入到 workQueue 中。MyThreadPool 内部维护的工作线程会消费 workQueue 中的任务并执行任务,相关的代码就是代码①处的 while 循环。线程池主要的工作原理就这些,是不是还挺简单的?

    // todo 线程的运行过程
    什么情况下增加新线程、添加到任务队列等等

    ThreadPoolExecutor 线程池参数

    ThreadPoolExecutor(
      int corePoolSize, // 表示线程池保有的最小线程数。
      int maximumPoolSize, // 表示线程池创建的最大线程数。当项目很忙时,就需要加人,最多加到 maximumPoolSize 个人。
      long keepAliveTime, // 线程可以空闲的存活时间
      TimeUnit unit,
      BlockingQueue<Runnable> workQueue, //工作队列,用来保存生产的资源,也是线程要消费的资源
      ThreadFactory threadFactory, // 通过这个参数自定义如何创建线程,如给线程指定一个名字
      RejectedExecutionHandler handler) // 拒绝策略
    拒绝策略
    ThreadPoolExecutor 已经提供了以下 4 种策略。
    CallerRunsPolicy:提交任务的线程自己去执行该任务。
    AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
    DiscardPolicy:直接丢弃任务,没有任何异常抛出。
    DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
    

    使用线程池的注意事项
    1、尽量使用有界队列
    Java 并发包里提供了一个线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。不过目前大厂的编码规范中基本上都不建议使用 Executors 了。
    不建议使用 Executors 的最重要的原因是:Executors 提供的很多方法默认使用的都是无界的 LinkedBlockingQueue,高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理,这是致命问题。所以强烈建议使用有界队列。

    2、默认的拒绝策略要慎用
    使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。

    14、Future 获取异步执行结果
    极客时间专栏 https://time.geekbang.org/column/article/91292
    如何获取异步任务执行结果
    Java 通过 ThreadPoolExecutor 提供的 3 个 submit() 方法和 1 个 FutureTask 工具类来支持获得任务执行结果的需求。下面我们先来介绍这 3 个 submit() 方法,这 3 个方法的方法签名如下。

    // 提交Runnable任务
    Future<?> 
      submit(Runnable task);
    // 提交Callable任务
    <T> Future<T> 
      submit(Callable<T> task);
    // 提交Runnable任务及结果引用  
    <T> Future<T> 
      submit(Runnable task, T result);
    

    你会发现它们的返回值都是 Future 接口,Future 接口有 5 个方法,我都列在下面了,它们分别是取消任务的方法 cancel()、判断任务是否已取消的方法 isCancelled()、判断任务是否已结束的方法 isDone()以及2 个获得任务执行结果的 get() 和 get(timeout, unit),其中最后一个 get(timeout, unit) 支持超时机制。通过 Future 接口的这 5 个方法你会发现,我们提交的任务不但能够获取任务执行结果,还可以取消任务。不过需要注意的是:这两个 get() 方法都是阻塞式的,如果被调用的时候,任务还没有执行完,那么调用 get() 方法的线程会阻塞,直到任务执行完才会被唤醒。

    // 取消任务
    boolean cancel(
      boolean mayInterruptIfRunning);
    // 判断任务是否已取消  
    boolean isCancelled();
    // 判断任务是否已结束
    boolean isDone();
    // 获得任务执行结果
    get();
    // 获得任务执行结果,支持超时
    get(long timeout, TimeUnit unit);
    FutureTask工具类
    前面我们提到的 Future 是一个接口,而 FutureTask 是一个实实在在的工具类,这个工具类有两个构造函数,它们的参数和前面介绍的 submit() 方法类似,所以这里我就不再赘述了。
    FutureTask(Callable<V> callable);
    FutureTask(Runnable runnable, V result);
    那如何使用 FutureTask 呢?其实很简单,FutureTask 实现了 Runnable 和 Future 接口,由于实现了 Runnable 接口,所以可以将 FutureTask 对象作为任务提交给 ThreadPoolExecutor 去执行,也可以直接被 Thread 执行;又因为实现了 Future 接口,所以也能用来获得任务的执行结果。下面的示例代码是将 FutureTask 对象提交给 ThreadPoolExecutor 去执行。
    // 创建FutureTask
    FutureTask<Integer> futureTask
      = new FutureTask<>(()-> 1+2);
    // 创建线程池
    ExecutorService es = 
      Executors.newCachedThreadPool();
    // 提交FutureTask 
    es.submit(futureTask);
    // 获取计算结果
    Integer result = futureTask.get();
    FutureTask 对象直接被 Thread 执行的示例代码如下所示。相信你已经发现了,利用 FutureTask 对象可以很容易获取子线程的执行结果。
    // 创建FutureTask
    FutureTask<Integer> futureTask
      = new FutureTask<>(()-> 1+2);
    // 创建并启动线程
    Thread T1 = new Thread(futureTask);
    T1.start();
    // 获取计算结果
    Integer result = futureTask.get();
    // 以上两种方式还可以组合起来使用。
    

    下面的示例代码就是用这一章提到的 Future 特性来实现的。首先,我们创建了两个 FutureTask——ft1 和 ft2,ft1 完成洗水壶、烧开水、泡茶的任务,ft2 完成洗茶壶、洗茶杯、拿茶叶的任务;这里需要注意的是 ft1 这个任务在执行泡茶任务前,需要等待 ft2 把茶叶拿来,所以 ft1 内部需要引用 ft2,并在执行泡茶之前,调用 ft2 的 get() 方法实现等待。

    // 创建任务T2的FutureTask
    FutureTask<String> ft2
      = new FutureTask<>(new T2Task());
    // 创建任务T1的FutureTask
    FutureTask<String> ft1
      = new FutureTask<>(new T1Task(ft2));
      
    // 线程T1执行任务ft1
    Thread T1 = new Thread(ft1);
    T1.start();
    // 线程T2执行任务ft2
    Thread T2 = new Thread(ft2);
    T2.start();
    // 等待线程T1执行结果
    System.out.println(ft1.get());
    
    // T1Task需要执行的任务:
    // 洗水壶、烧开水、泡茶
    class T1Task implements Callable<String>{
      FutureTask<String> ft2;
      // T1任务需要T2任务的FutureTask
      T1Task(FutureTask<String> ft2){
        this.ft2 = ft2;
      }
      @Override
      String call() throws Exception {
        System.out.println("T1:洗水壶...");
        TimeUnit.SECONDS.sleep(1);
        
        System.out.println("T1:烧开水...");
        TimeUnit.SECONDS.sleep(15);
        // 获取T2线程的茶叶  
        String tf = ft2.get();
        System.out.println("T1:拿到茶叶:"+tf);
    
        System.out.println("T1:泡茶...");
        return "上茶:" + tf;
      }
    }
    // T2Task需要执行的任务:
    // 洗茶壶、洗茶杯、拿茶叶
    class T2Task implements Callable<String> {
      @Override
      String call() throws Exception {
        System.out.println("T2:洗茶壶...");
        TimeUnit.SECONDS.sleep(1);
    
        System.out.println("T2:洗茶杯...");
        TimeUnit.SECONDS.sleep(2);
    
        System.out.println("T2:拿茶叶...");
        TimeUnit.SECONDS.sleep(1);
        return "龙井";
      }
    }
    
    // 一次执行结果:
    T1:洗水壶...
    T2:洗茶壶...
    T1:烧开水...
    T2:洗茶杯...
    T2:拿茶叶...
    T1:拿到茶叶:龙井
    T1:泡茶...
    上茶:龙井
    

    14、CompletableFuture 异步编程

    极客时间专栏文章链接 https://time.geekbang.org/column/article/91569
    CompletableFuture 实现了 CompletionStage 接口。任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等,这些都在CompletionStage接口中得到了体现。很好用就是了。具体参考上述文章链接。

    注意异常处理,默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置 ForkJoinPool 线程池的线程数)。如果所有 CompletableFuture 共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰。

    15、CompletionService 批量执行异步任务

    当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

    16、Fork/Join 并行计算框架

    极客时间专栏文章链接 https://time.geekbang.org/column/article/92524

    Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的 MapReduce,所以你可以把 Fork/Join 看作单机版的 MapReduce。

    Fork/Join 并行计算框架的核心组件是 ForkJoinPool。ForkJoinPool 支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。

    模拟MapReduce统计单词数量
    学习 MapReduce 有一个入门程序,统计一个文件里面每个单词的数量,下面我们来看看如何用 Fork/Join 并行计算框架来实现。我们可以先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果,你可以对照前面的简版分治任务模型图来理解这个过程。

    上述极客时间链接里存在具体的实现代码,可以仔细看看。

    相关文章

      网友评论

          本文标题:读《Java并发编程》小结

          本文链接:https://www.haomeiwen.com/subject/emqqxrtx.html