一、并发与并行
并发:一个CPU一个核同一时间段执行多个任务。
并行:多个CPU多个核同一时刻执行多个任务。
二、任务性质类型
CPU密集型(CPU-bound)
CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。在多重程序系统中,大部份时间用来做计算、逻辑判断等CPU动作的程序称之CPU bound。CPU bound的程序一般而言CPU占用率相当高。这可能是因为任务本身不太需要访问I/O设备,也可能是因为程序是多线程实现因此屏蔽掉了等待I/O的时间。
线程数一般设置为:线程数 = CPU核数+1 (现代CPU支持超线程)
IO密集型(I/O bound)
IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。I/O bound的程序一般在达到性能极限时,CPU占用率仍然较低。这可能是因为任务本身需要大量I/O操作,而pipeline做得不是很好,没有充分利用处理器能力。
线程数一般设置为:线程数 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
CPU密集型 vs IO密集型
我们可以把任务分为计算密集型和IO密集型。计算密集型任务的特点是要进行大量的计算,消耗CPU资源。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少。
问题:如何利用多核CPU,计算一个数组很大的整数和?
答案:分治法,使用递归,把任务分成多个小任务,由多线程执行。
难点:分治法,使用递归,使用多个线程处理问题,如果分出来的线程任务过多也是个难题。使用线程池限定线程数的话,第一批线程任务由线程执行,因为递归,第一批线程任务需要第二批线程任务的执行结果,第二批线程任务需要第三批线程的执行结果...
解决:Fork/Join 框架。每个工作线程对应一个工作队列。
其它:ForkJoin框架处理的任务基本都能使用递归处理,但递归算法的缺陷是:一是只用单线程处理,二是递归次数过多时会导致堆栈溢出;ForkJoin解决了这两个问题,使用多线程并发处理,充分利用计算资源来提高效率,同时避免堆栈溢出发生。
三、原理
Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
image.png
Fork/Jion特性
- ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。
- ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。
- ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。
四、使用
ForkJoinTask:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行 fork() 和 join() 操作的机制,通常情况下我们不需要直接继承 ForkJoinTask 类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:
1、RecursiveAction:用于没有返回结果的任务。
2、RecursiveTask :用于有返回结果的任务。
CountedCompleter: 在任务完成执行后会触发执行一个自定义的钩子函数。
ForkJoinPool :ForkJoinTask 需要通过 ForkJoinPool 来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
ForkJoinPool fjp = new ForkJoinPool(NCPU); //使用的线程数
ForkJoinTask<Long> task = fjp.submit(new MyTask()); //提交任务
class MyTask extends RecursiveTask<Long> {
protected Long compute();
}
网友评论