美文网首页
正确使用线程池

正确使用线程池

作者: lynnnnyl | 来源:发表于2022-10-17 12:18 被阅读0次

线程池的设计采用生产者-消费者模式,如果把线程池类比为一个项目组,则线程是项目组的成员。线程池中的一些参数说明:

  1. corePoolSize:表示线程池保有的最小线程数。有些项目很闲,但是也不能把人都撤了,至少要留 corePoolSize 个人坚守阵地。
  2. maximumPoolSize:表示线程池可以创建的最大线程数。当项目很忙时,就需要加人,但是也不能无限制地加,最多就加到 maximumPoolSize 个人。当项目闲下来时,就要撤人了,最多能撤到 corePoolSize 个人。
  3. keepAliveTime&unit:上面提到项目根据忙闲来增减人员,那在编程世界里,如何定义忙和闲呢?很简单,一个线程如果在一段时间内,都没有执行任务,说明很闲,keepAliveTime 和 unit 就是用来定义这个“一段时间”的参数。也就是说,如果一个线程空闲了keepAliveTime & unit这么久,而且线程池的线程数大于 corePoolSize ,那么这个空闲的线程就要被回收了。
  4. workQueue:工作队列
  5. threadFactory:通过这个参数你可以自定义如何创建线程,例如你可以给线程指定一个有意义的名字。
  6. handler:通过这个参数定义任务的拒绝策略。如果线程池中所有线程都在忙碌,且工作队列也满了(前提是工作队列是有界队列),那么此时提交任务,线程池就会拒绝接收。至于拒绝的策略,你可以通过 handler 这个参数来指定。ThreadPoolExecutor 已经提供了以下 4 种策略。
    CallerRunsPolicy:提交任务的线程自己去执行该任务。
    AbortPolicy:默认的拒绝策略,会 throws RejectedExecutionException。
    DiscardPolicy:直接丢弃任务,没有任何异常抛出。
    DiscardOldestPolicy:丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列。
    Java 在 1.6 版本还增加了 allowCoreThreadTimeOut(boolean value) 方法,它可以让所有线程都支持超时,这意味着如果项目很闲,就会将项目组的成员都撤走。

使用线程池注意事项

  1. 使用有界队列。高负载情境下,无界队列很容易导致 OOM,而 OOM 会导致所有请求都无法处理。
  2. 慎重使用默认拒绝策略。使用有界队列,当任务过多时,线程池会触发执行拒绝策略,线程池默认的拒绝策略会 throw RejectedExecutionException 这是个运行时异常,对于运行时异常编译器并不强制 catch 它,所以开发人员很容易忽略。因此默认拒绝策略要慎重使用。如果线程池处理的任务非常重要,建议自定义自己的拒绝策略;并且在实际工作中,自定义的拒绝策略往往和降级策略配合使用。
  3. 异常处理。任务在执行过程中会出现运行时异常(RuntimeException),会导致执行任务的线程终止,但是却不会有任何通知。虽然线程池提供了很多用于异常处理的方法,但是最稳妥和简单的方案还是捕获所有异常并按需处理,你可以参考下面的示例代码。
try {
 //业务逻辑
} catch (RuntimeException x) {
 //按需处理
} catch (Throwable x) {
 //按需处理
} 

线程池在 Java 并发编程领域非常重要,很多大厂的编码规范都要求必须通过线程池来管理线程。线程池和普通的池化资源有很大不同,线程池实际上是生产者 - 消费者模式的一种实现,理解生产者 - 消费者模式是理解线程池的关键所在。

批量执行异步任务---CompletionService

CompletionService是Java SDK并发包中的一个工具类,将线程池和阻塞队列的功能融合在了一起。CompletionService 的实现类 CompletionService内部维护了一个阻塞队列completionQueue,如果未指定,则默认为无界的LinkedBlockingQueue。利用CompletionService的submit()方法提交一个异步任务,返回一个Future对象放入阻塞队列中。通过调用cs.take().get(),可以拿到最快返回的任务执行结果。因为,任务进入阻塞队列的顺序按照任务完成顺序,所以CompletionService可以得到有序的异步任务执行结果,避免无谓的等待,同时还可以实现诸如Forking Cluster这样的需求。ExecutorCompletionService需要自己创建一个线程池,可以让多个ExecutorCompletionService的线程池隔离,这种隔离性能够避免几个特别耗时的任务拖垮整个应用的风险。

线程池、Future、CompletableFuture、CompletionService

这些工具类都是基于任务的角度解决问题,而不关注线程之间如何协作的细节(如线程之间如何实现等待、通知等)。对于简单的并行任务,可以通过“线程池+Future”的方案解决;如果任务之间有聚合关系(AND或者OR聚合),都可以通过CompletableFuture来解决;批量的并行任务,可以通过CompletionService解决。
日常工作中的并发场景包括上述的简单并行任务、聚合任务和批量并行任务三种,还有一种“分治”的任务模型。分治,把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解。Java并发包里的Fork/Join的并行计算框架,就是用来支持这种分治的任务模型。

分治任务模型

分治任务模型可分为两个阶段:

  1. 任务分解:将任务迭代分解为子任务,直至子任务计算出结果
  2. 结果合并:逐层合并子任务的执行结果,直至获得最终结果

Fork/Join的使用

Fork--任务分解;Join--结果合并
Fork/Join计算框架主要包含两部分:分治任务的线程池ForkJoinPool,分治任务ForkJoinTask。类似于ThreadPoolExecutor和Runnable的关系,都可以为理解为提交任务到线程池,不过分治任务有自己的独特类型ForkJoinTask。
ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,其中 fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的执行结果。ForkJoinTask 有两个子类——RecursiveAction 和 RecursiveTask,通过名字你就应该能知道,它们都是用递归的方式来处理分治任务的。这两个子类都定义了抽象方法 compute(),不过区别是 RecursiveAction 定义的 compute() 没有返回值,而 RecursiveTask 定义的 compute() 方法是有返回值的。这两个子类也是抽象类,在使用的时候,需要你定义子类去扩展。
与ThreadPoolExecutor相比,ForkJoinPool是一个更加智能的生产者-消费者实现。
ThreadPoolExecutor内部只有一个任务队列,而ForkJoinPool内部有多个任务队列,当通过ForkJoinPool的invoke()和submit()提交任务时,ForkJoinPool会根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中创建出子任务,那么子任务会提交到工作线程对应的任务队列中。

任务窃取机制
ForkJoinPool支持一种“任务窃取”机制,如果当前工作线程空闲了,会窃取其它工作线程对应的任务队列里的任务。此外,ForkJoinPool中采用的是双端队列,当前工作线程正常获取任务和空闲线程窃取任务分别从队列的两端消费,可以减少不必要的数据竞争。

Fork/Join总结

Fork/Join并行计算框架解决的主要是分治问题,将大任务拆解为子任务,再将子任务的运行结果聚合起来,从而得到最终结果。这个过程与大数据中的MapReduce类似,可以看作单机版的MapReduce。ForkJoin并行框架的核心组件是ForkJoinPool,ForkJoinPool支持任务窃取机制,所以可以让所有线程的工作量均衡,不会出现有的线程很忙,有的线程很闲的状况。Java8中的并行流也是以Fork/Join为基础的。需要注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的 ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。

相关文章

网友评论

      本文标题:正确使用线程池

      本文链接:https://www.haomeiwen.com/subject/wapfzrtx.html