线程池有两个线程数的设置,一个为核心线程数,一个为最大线程数。在创建完线程池之后,默认情况下,线程池中并没有任何线程,等到有任务来才创建线程去执行任务。
但有一种情况排除在外,就是调用 prestartAllCoreThreads() 或者 prestartCoreThread() 方法的话,可以提前创建等于核心线程数的线程数量,这种方式被称为预热,在抢购系统中就经常被用到。
当创建的线程数等于 corePoolSize 时,提交的任务会被加入到设置的阻塞队列中。当队列满了,会创建线程执行任务,直到线程池中的数量等于 maximumPoolSize。
当线程数量已经等于 maximumPoolSize 时, 新提交的任务无法加入到等待队列,也无法创建非核心线程直接执行,我们又没有为线程池设置拒绝策略,这时线程池就会抛出 RejectedExecutionException 异常,即线程池拒绝接受这个任务。
当线程池中创建的线程数量超过设置的 corePoolSize,在某些线程处理完任务后,如果等待 keepAliveTime 时间后仍然没有新的任务分配给它,那么这个线程将会被回收。线程池回收线程时,会对所谓的“核心线程”和“非核心线程”一视同仁,直到线程池中线程的数量等于设置的 corePoolSize 参数,回收过程才会停止。
即使是 corePoolSize 线程,在一些非核心业务的线程池中,如果长时间地占用线程数量,也可能会影响到核心业务的线程池,这个时候就需要把没有分配任务的线程回收掉。
我们可以通过 allowCoreThreadTimeOut 设置项要求线程池:将包括“核心线程”在内的,没有任务分配的所有线程,在等待 keepAliveTime 时间后全部回收掉。
CPU 密集型任务:
这种任务消耗的主要是 CPU 资源,可以将线程数设置为 N(CPU 核心数)+1,比 CPU 核心数多出来的一个线程是为了防止线程偶发的缺页中断,或者其它原因导致的任务暂停而带来的影响。一旦任务暂停,CPU 就会处于空闲状态,而在这种情况下多出来的一个线程就可以充分利用 CPU 的空闲时间。
I/O 密集型任务:
这种任务应用起来,系统会用大部分的时间来处理 I/O 交互,而线程在处理 I/O 的时间段内不会占用 CPU 来处理,这时就可以将 CPU 交出给其它线程使用。因此在 I/O 密集型任务的应用中,我们可以多配置一些线程,具体的计算方法是 2N。
对于简单的并行任务,可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;
任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。
CompletionStage 接口可以清晰地描述任务之间的这种时序关系,例如前面提到的 f3 = f1.thenCombine(f2, ()->{}) 描述的就是一种汇聚关系。
- 描述串行关系
CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口。(thenApply 就是promise中的then)
CompletableFuture<String> f0 =
CompletableFuture.supplyAsync(
() -> "Hello World") //①
.thenApply(s -> s + " QQ") //②
.thenApply(String::toUpperCase);//③
System.out.println(f0.join());
// 输出结果
HELLO WORLD QQ
2.描述 AND 关系
CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。(promise.all)
- 描述 OR 关系
CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。
如何使用 applyToEither() 方法来描述一个 OR 汇聚关系。(类似于promise.race)
CompletableFuture<String> f1 =
CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
CompletableFuture<String> f2 =
CompletableFuture.supplyAsync(()->{
int t = getRandom(5, 10);
sleep(t, TimeUnit.SECONDS);
return String.valueOf(t);
});
CompletableFuture<String> f3 =
f1.applyToEither(f2,s -> s);
System.out.println(f3.join());
- 异常处理
虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于 whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。
CompletableFuture<Integer>
f0 = CompletableFuture
.supplyAsync(()->7/0))
.thenApply(r->r*10)
.exceptionally(e->0);
System.out.println(f0.join());
问题代码示例
1.查数据库属于io操作,用定制线程池
2.查出来的结果做为下一步处理的条件,若结果为空呢,没有对应处理
3.缺少异常处理机制
// 采购订单
PurchersOrder po;
CompletableFuture<Boolean> cf =
CompletableFuture.supplyAsync(()->{
// 在数据库中查询规则
return findRuleByJdbc();
}).thenApply(r -> {
// 规则校验
return check(po, r);
});
Boolean isOk = cf.join();
内部原理
CompletableFuture中任务的执行同样依靠ForkJoinPool
ForkJoinPool接受的任务是ForkJoinTask 类型,而我们向CompletableFuture提交的任务是Runnable/Supplier/Consumer/Function。因此,肯定需要一个适配机制,把这四种类型的任务转换成ForkJoinTask,然后提交给ForkJoinPool
ForkJoinPool数据结构,不同于ThreadPoolExector,除一个全局的任务队列之外,每个线程还有一个自己的局部队列。
分治算法
对于分治算法来说,分解出来的一个个任务并不是独立的,而是相互依赖,一个任务的完成要依赖另一个前置任务的完成。这种依赖关系是通过ForkJoinTask中的join()来体现的。
线程在执行当前ForkJoinTask的时候,产生了left、right 两个子Task。所谓fork,是指把这两个子Task放入队列里面,join()则是要等待2个子Task完成。而子Task在执行过程中,会再次产生两个子Task。如此层层嵌套,类似于递归调用,直到最底层的Task计算完成,再一级级返回。
使用 ThreadPoolExecutor 不能高效地执行该算法,因为父任务必须等待其子任务完成,而线程池执行器中的线程不能向队列中添加另一个任务并等待任务完成,一旦其线程处于等待状态,它就不能用来执行任何子任务了。ForkJoinPool 允许它的线程创建新的任务,然后挂起当前任务。当任务被挂起时,其线程可以执行其他待处理任务。(不会让线程白等待,而是可以转去做新任务,有点像协程)
举一个简单的例子。假设有个 double 数组,我们要统计数组中小于 0.5 的值的数量。依次扫描数组是很简单的(而且可能更快,你将在本节后面看到相关分析),但现在,将数组划分成子数组,然后并行扫描它们(模拟更复杂的快速排序和其他分治算法),更有意义。以下是用 ForkJoinPool 实现这种行为的代码框架:
private class ForkJoinTask extends RecursiveTask<Integer> {
private int first;
private int last;
public ForkJoinTask(int first, int last) {
this.first = first;
this.last = last;
}
protected Integer compute() {
int subCount;
if (last - first < 10) {
subCount = 0;
for (int i = first; i <= last; i++) {
if (d[i] < 0.5)
subCount++;
}
}
else {
int mid = (first + last) >>> 1;
ForkJoinTask left = new ForkJoinTask(first, mid);
left.fork();
ForkJoinTask right = new ForkJoinTask(mid + 1, last);
right.fork();
subCount = left.join();
subCount += right.join();
}
return subCount;
}
}
这里的 fork() 方法和 join() 方法是关键。如果没有这些方法(ThreadPoolExecutor 执行的任务中就没有这些方法),很难实现这种递归。这些方法使用了一系列针对每个线程的内部队列来管理任务,并将线程从执行一个任务切换到执行另一个任务。这些细节对开发人员来说是透明的,如果你对算法感兴趣,那么会发现这些代码读起来很有趣。这里关注的重点是性能,ForkJoinPool 和 ThreadPoolExecutor 之间存在哪些权衡?
首先,fork/join 范式所实现的挂起使得所有任务只需要几个线程来执行。使用该示例代码统计一个有 200 万个元素的数组中的 double 值,会创建 400 多万个任务,但这些任务只需要几个线程(甚至是一个线程,如果这对运行测试的机器有意义的话)就能轻松执行。而使用 ThreadPoolExecutor 运行类似的算法将需要 400 多万个线程,因为每个线程都必须等待其子任务完成,这些子任务只有在池中有额外线程可用时才能完成。
像这样的简单算法并不是特别适用于在现实世界中使用的 ForkJoinPool。这个线程池非常适合以下情况:
- 算法的合并部分会执行一些有趣的工作(而不是像本例中那样简单地将两个数字相加);
- 算法的叶子计算所执行的工作足以抵消创建任务的开销。
跨线程信息传递
使用ThreadLocal作为业务上下文传递的经典技术手段在中间件、技术与业务框架中广泛大量使用。而对于生产应用,几乎一定会使用线程池等异步执行组件,以高效支撑线上大流量。但使用ThreadLocal及其set/remove的上下文传递模式,在使用线程池等异步执行组件时,存在多方面的问题:
-
从业务使用者角度来看
-
繁琐
- 业务逻辑要知道:有哪些上下文;各个上下文是如何获取的。
- 并需要业务逻辑去一个一个地捕捉与传递。
-
依赖
- 需要直接依赖不同ThreadLocal上下文各自的获取的逻辑或类。
- 像RPC的上下文(如Dubbo的RpcContext)、全链路跟踪的上下文(如SkyWalking的ContextManager)、不同业务模块中的业务流程上下文,等等。
-
静态(易漏)
- 因为要 事先 知道有哪些上下文,如果系统出现了一个新的上下文,业务逻辑就要修改添加上新上下文传递的几行代码。也就是说因 系统的 上下文新增,业务的 逻辑就跟进要修改。
- 而对于业务来说,不关心系统的上下文,即往往就可能遗漏,会是线上故障了。
- 随着应用的分布式微服务化并使用各种中间件,越来越多的功能与组件会涉及不同的上下文,逻辑流程也越来越长;上下文问题实际上是个大的易错的架构问题,需要统一的对业务透明的解决方案。
-
定制性
- 因为需要业务逻辑来完成捕捉与传递,业务要关注『上下文的传递方式』:直接传引用?还是拷贝传值?拷贝是深拷贝还是浅拷贝?在不同的上下文会需要不同的做法。
- 『上下文的传递方式』往往是 上下文的提供者(或说是业务逻辑的框架部分)才能决策处理好的;而 上下文的使用者(或说是业务逻辑的应用部分)往往不(期望)知道上下文的传递方式。这也可以理解成是 依赖,即业务逻辑 依赖/关注/实现了 系统/架构的『上下文的传递方式』。
-
从整体流程实现角度来看
关注的是 上下文传递流程的规范化。上下文传递到了子线程要做好 清理(或更准确地说是要 恢复 成之前的上下文),需要业务逻辑去处理好。如果业务逻辑对清理的处理不正确,比如:
-
如果清理操作漏了:
- 下一次执行可能是上次的,即『上下文的 污染/串号』,会导致业务逻辑错误。
- 『上下文的 泄漏』,会导致内存泄漏问题。
-
如果清理操作做多了,会出现上下文 丢失。
上面的问题,在业务开发中引发的Bug真是屡见不鲜 !本质原因是:ThreadLocal的set/remove的上下文传递模式 在使用线程池等异步执行组件的情况下不再是有效的。常见的典型例子: -
当线程池满了且线程池的RejectedExecutionHandler使用的是CallerRunsPolicy时,提交到线程池的任务会在提交线程中直接执行,ThreadLocal.remove操作清理提交线程的上下文导致上下文丢失。
-
类似的,使用ForkJoinPool(包含并行执行Stream与CompletableFuture,底层使用ForkJoinPool)的场景,展开的ForkJoinTask会在任务提交线程中直接执行。同样导致上下文丢失。
期望:上下文生命周期的操作从业务逻辑中分离出来。业务逻辑不涉及生命周期,就不会有业务代码如疏忽清理而引发的问题了。整个上下文的传递流程或说生命周期可以规范化成:捕捉、回放和恢复这3个操作,即CRR(capture/replay/restore)模式。
网友评论