前言
感谢王宝令老师极客时间的 课程,通俗易懂,这里再次推荐
哎,这篇文章敲了一遍没看懂……
背景
前几篇文章我们介绍了线程池,Future 、CompletableFuture 和CompletionService (其中后两者待补充)。仔细观察你会发现这些工具类都是在帮我们站在任务的视角来解决并发问题,而不是让我们纠缠在线程之间的如何协作细节上(比如线程之间如何等待、通知等),对于简单的并行任务,可以通过线程池+Future 的方案来解决,如果任务之间有聚合关系,无论是AND聚合还是OR 聚合,都可以通过CompletableFuture 来解决,而批量的并行任务,则可以通过CompletionService 来解决。
我们一直讲,并发编程可以分为三个层面的问题,分别是分工、协作、和互斥,当你关注于任务的时候,你会发现你的视角已经从并发编程的细节中跳出来了,你应用的更多的是现实世界的思维模式,类比的往往是现实世界里的分工,所以我把线程池、Future 、CompletableFuture 和CompletionService 都列到了分工里面。
下面我们用现实世界里的工作流程图描述了并发编程领域里的简单并行任务、聚合任务和批量执行任务,辅以流程图,相信你一定能将你的思维模式转换到显示世界里来。
img从上到下,依次为简单并行任务、聚合任务和批量并行任务示意图
上面提到的简单并行、聚合、批量执行这三种任务模型,基本上能够覆盖日常工作中的并发场景了,但是还是不够全面,因为还有一种“分治”的任务模型没有覆盖到。“分治”,顾名思义,分而治之,是一种解决复杂问题的思维方法和模式,具体来讲,指的是把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解。理论上来讲,解决每一个问题都对应着一个任务,所以对于问题的分治,实际上就是对于任务的分治。
分治思想在很多领域都有广泛应用,例如算法领域有分治算法,(归并排序,快速排序都属于分治算法,二分法查找也属于一种分治算法);大数据领域的知名框架,MapReduce 背后的思想也是分治。既然分治这种任务模型如此普遍,那Java 显然也需要支持,Java 并发包里提供了一种叫做Fork/Join 的 并行框架,就是用来支持分治这种任务模型的。
分治任务模型
这里你需要深入了解下分治任务模型,分治任务模型可以分为两个阶段 一个阶段是任务分解,也就是将任务迭代的分解为子任务,直至子任务可以直接计算出结果。另一个阶段是结果合并,即逐层合并子任务的执行结果,直至获取最终结果。下图是一个简化版的分治任务模型:
img在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的,具备这种相似性的问题,我们往往都采用递归的算法。
Fork/Join 的使用
Fork/Join 是一个并行计算框架,主要用来支持分治模型的,这个计算框架里的Fork 对应的是分治任务模型里的分解,Join 对应的结果合并。Fork/Join 计算框架主要包括两部分,一部分是分治任务的线程池ForkJoinPool ,另一部分是分治任务ForkJoinTask 。这两部分的关系类似于ThreadPoolExecutor 和Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特的类型ForkJoinTask 。
ForkJoinTask 是一个抽象类,他的方法很多,最核心的是fork() 方法和 join() 方法 ,其中fork() 方法会异步的执行一个子任务,而join() 方法则会阻塞当前线程来等待子任务的执行结果。ForkJoinTask 有两个子类RecursiveAction 和RecursiveTask ,通过名字你就应该知道,他们都是递归的方式来处理分治任务的。这两个子类都定义了抽象的方法compute(), 不过区别是RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的。 这两个子类也是抽象类,在使用的时候需要你定义子类去扩展。
ForkJoinPool 工作原理
Fork/Join 并行计算的核心组件是ForkJoinPool ,所以下面我们来简单介绍一下ForkJoinPool 的工作原理。
通过前面文章的学习,你应该已经知道了ThreadPoolExecutor 本质上是一个生产者-消费者模式的实现,内部有一个任务队列,这个队列是消费者生产者通信的媒介,ThreadPoolExecutor 可以有多个工作线程,但是这些工作线程都共享一个工作队列。
ForkJoinPool 本质上也是一个消费者-生产者模型的实现,但是更加智能,你可以参考下面的ForkJoinPool 工作原理来理解,ForkJoinPool 内部只有一个任务队列,而ForkJoinPool 内部有多个任务队列,当我们通过ForkJoinPool 的invoke() 或者submit() 方法提交任务的时候,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务再执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。
如果工作线程对应的任务队列空了,是不是就没活干了呢?不是的,ForkJoinPool 支持一种叫做“任务窃取”的机制, 如果工作线程空闲了,那么他可以窃取” 其他工作任务队列中的任务,例如下图中,线程 T2 对应的任务队列已经空了,他可以窃取”线程 T1 对 应的任务队列中的任务,如此一来所有的工作线程都闲不下来了。
ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和窃取任务”分别是从任务队列不同的端消费, 这样能避免不必要的数据竞争,我们这里仅仅是简化后的原理,ForkJoinPool 的实现远比我们这里介绍的复杂,如果你感兴趣,建议去看它的源码。
img模拟 MapReduce 统计单词数量
学习MapReduce 有一个入门程序,统计一个文件里面每个单词的数量,下面我们来看看如何用Fork/Join 并行计算框架实现。
这里省略实现……………………
总结
Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小任务去解决,然后再把子任务的结果聚合起来从而得到最终的结果。这个过程处理非常类似于大数据处理中的MapReduce ,所以你可以把Fork/Join 看作单机版的MapReduce 。
Fork/Join 并行计算框架的核心组件是ForkJoinPool 。ForkJoinPool 支持任务窃取机制,能够让所以的线程工作量基本均衡,不会出现有的线程很忙,有的线程很闲,所以性能很好。Java 1.8 提供的 Stream API 里面并行流也是以ForkJoinPool 为基础的,不过需要注意的是,默认情况下所有的并行流计算都共享一个ForkJoinPool ,这个共享的ForkJoinPool 默认的线程数是CPU 的核数,如果所有的并行流计算都是CPU 密集型计算的话,完全没有问题,但是如果存在I/O 密集型的并行流计算,那么很可能会因为一个慢的I/O 计算而拖慢整个系统的性能,所以建议用不同的ForkJoinPool 执行不同类型的计算任务。
网友评论