线程池

作者: 9283856ddec1 | 来源:发表于2020-02-17 16:03 被阅读0次

当我们需要频繁的创建多个线程进行耗时操作时,每次通过new Thread实现并不是一种好的方式,每次新建和销毁对象性能较差,线程缺乏统一管理,可能无限制新建线程,相互之间竞争,可能占用过多系统资源导致死锁,并且缺乏定时执行、定期执行、线程中断等功能。在开发过程中,合理地使用线程池能够带来如下好处。

  1. 降低资源消耗;
    通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  2. 提高响应速度;
    当任务到达时,任务可以不需要等到线程创建就能立即执行。

  3. 提高线程的可管理性;
    线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源, 还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。提供定时执行,定期执行,单线程,并发数控制等功能。

线程池原理

当向线程池提交一个任务之后,线程池 的主要处理流程如图所示。


线程池的主要处理流程.png

执行流程说明:
1)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作 线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。

2)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这 个工作队列里。如果工作队列满了,则进入下个流程。

3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程 来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

线程池使用

Executor框架的使用示意图如图所示:


Executor框架的使用示意图.png
  • 主线程创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象,使用方法:Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule)。
  • 可以把Runnable对象直接交给ExecutorService执行ExecutorService.execute(Runnable command);或者也可以把Runnable对象或Callable对象提交执行Executor- Service.submit(Runnable task)或ExecutorService.submit(Callable<T>task)。
  • 主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行
    FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

线程池的创建

通常我们都不会直接通过new的形式来创建线程池,由于创建参数过程相对复杂一些,JDK给我们提供了Executors工厂类来简化这个过程。
详细创建见下文:ThreadPoolExecutor

向线程池提交任务

可以使用两个方法向线程池提交任务,分别为execute()和submit()方法。
execute()
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。如下所示:

threadsPool.execute(new Runnable() { 
    @Override 
    public void run() { 
        // TODO Auto-generated method stub 
    } 
});

submit()
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个 future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方 法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线 程一段时间后立即返回,这时候有可能任务没有执行完。

Future<Object> future = executor.submit(harReturnValuetask);
try { 
    Object s = future.get(); 
} catch (InterruptedException e) { 
    // 处理中断异常 
} catch (ExecutionException e) { 
    // 处理无法执行任务异常 
} finally { 
    // 关闭线程池 
    executor.shutdown(); 
}

关闭线程池

可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线 程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务 可能永远无法终止。
shutdownNow
首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

shutdown
只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线 程。

只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务 都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪 一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭 线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

合理配置线程池

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

  • 任务的性质
    1)CPU密集型任务
    CPU密集型任务应配置尽可能小的 线程,如配置Ncpu+1个线程的线程池。

    2)IO密集型任务
    IO密集型任务线程并不是一直在执行任务,则应配 置尽可能多的线程,如2*Ncpu。

    3)混合型任务。
    混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务 和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量 将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。

  • 任务的优先级:高、中和低
    优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高 的任务先执行。

  • 任务的执行时间:长、中和短
    执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让 执行时间短的任务先执行。

  • 任务的依赖性:是否依赖其他系统资源,如数据库连接
    依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越 长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。

线程池的监控

如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根 据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的 时候可以使用以下属性。

属性 说明
taskCount 线程池需要执行的任务数量
completedTaskCount 线程池在运行过程中已完成的任务数量,小于或等于taskCount
largestPoolSize 线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是 否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。
getPoolSize 线程池的线程数量
getActiveCount 获取活动的线程数

通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执 行一些代码来进行监控。

Executor框架

Executor框架简介

Executor框架的两级调度模型

在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线 程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程 也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。两级调度模型如下所示:

任务的两级调度模型.png
上层,把应用分解为若干个任务,然后使用用户级的调度器(Executor)将这些任务映射到固定数量的线程;
下层,操作系统内核将这些线程映射到硬件处理器上。
Executor框架的结构

Executor框架包含的主要的类与接口如图所示:


Executor框架的类与接口.png

Executor框架主要由3大部分组成如下:
(1)任务
被执行任务需要实现的接口:Runnable接口或Callable接口。

(2)任务的执行
任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。

(3)异步计算的结果
接口Future和实现Future接口的FutureTask类。

Runnable接口和Callable接口

Runnable接口和Callable接口的实现类,都可以被Executor执行。它们之间的区别是Runnable不会返回结果,而Callable可以返回结果。可以使用工厂类Executors来把Runnable包装成一个Callable。

  • 把一个Runnable包装成一个Callable的API:
public static Callable<Object> callable(Runnable task)
  • 把一个Runnable和一个待返回的结果包装成一个Callable的API:
public static <T> Callable<T> callable(Runnable task, T result)

ExecutorService

ExecutorService定义了线程池需要实现的接口,其生命生命周期包括3种状态:运行、关闭、终止。创建后便进入运行状态,当调用shutdown()方法时,便进入关闭状态,此时意味着ExecutorService不再接受新的任务,但它还在执行已经提交了的任务。当所有已经提交了的任务执行完后,就变成终止状态。

ThreadPoolExecutor

ThreadPoolExecutor构造函数如下:

public ThreadPoolExecutor(int corePoolSize, int maxinumPoolSize, 
                                             long keepAliveTime, TimeUnit unit, 
                                             BlockingQueue<Runnable> workQueue, 
                                             ThreadFactory threadFactory, 
                                             RejectedExecutionHandler handler)

参数详细说明如下所示:

参数名 作用
corePoolSize 线程池所保存的核心线程数,线程池启动后默认是空的,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的线程能够执行任务也会创建线程,等到需要执行的任务大于线程核心数后就不再创建。prestartAllCoreThreads方法可以在线程池启动后立即启动所有核心线程以等待任务
maxinumPoolSize 线程池允许创建的最大线程数。如果队列满了,并且已创建的线程小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没有效果。
keepAliveTime 当前线程池线程总数大于核心线程数时,终止多于的空闲线程的时间。如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
unit keepAliveTime参数的时间单元,可选值有天、小时、分钟、毫秒、微秒和纳秒。
workQueue 任务队列,如果当前线程池达到核心线程数corePoolSize,且当前所有线程都处于活动状态时,则将新加入的任务放到此队列中
threadFactory 线程工厂,可以通过线程工厂给每个创建出来的线程设 置更有意义的名字。用户可以定制线程的创建过程,通常不需要设置。
handler 拒绝策略,当线程池与workQueue队列都满了的情况下,说明线程池处于饱和状 态,那么必须采取一种策略处理提交的新任务。
  • workQueue有下列几个常用实现:
    (1)ArrayBlockingQueue:基于数组结构的有界队列,此队列按FIFO原则对任务进行排序,如果队列满了还有任务进来,则调用决绝策略。
    (2)LinkedBlockingQueue:基于链表结构的无界队列,此队列按照FIFO原则对任务进行排序。因为是无界的,根本不会满,所以采用此队列后线程池将忽略最大线程数和拒绝策略;
    (3)SynchronousQueue:不存元素的阻塞队列,直接将任务提交给线程而不是将它加入到队列。每个插入的操作必须等到另一个调用移除的操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。如果新任务到来后线程池没有任务可用线程处理,则调用拒绝策略。
    (4)PriorityBlockingQueue:具有优先级的队列的无限阻塞队列,可以自定义优先级,默认是按自然排序。

  • 拒绝策略有如下几种:
    (1)AbortPolicy:拒绝任务,抛出RejectedExecutionException异常,线程池的默认策略。
    (2)CallerRunsPolicy:拒绝新任务进入,如果该线程池还没有被关闭,那么将这个新任务执行在调用线程中。
    (3)DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。这样的结果是最后加入的任务反而可能被执行到,先前加入的都被抛弃了。
    (4)DiscardPolicy:加不进的任务都被抛弃,同时没有异常抛出。

FixedThreadPool详解

FixedThreadPool被称为可重用固定线程数的线程池。下面是FixedThreadPool的源代码实现:

public static ExecutorService newFixedThreadPool(int nThreads){
        ruturn new ThreadPoolExecutor(nThreads, nThreads, 
                                                            0L, TimeUnit.MILLISECONDS
                                                            new LinkedBlockingQueue<Runnable>());
}

设置它的corePoolSize和maxinumPoolSize值都是nThreads,并且设置keepAliveTime参数为0毫秒,最后设置无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。该线程池中就含有了固定个数的线程,并且能够容纳无限个任务。
FixedThreadPool的execute()的运行示意图如下所示:


FixedThreadPool的execute()的运行示意图.png

使用无界队列LinkedBlockingQueue作为工作队列会对线程池带来如下影响:
1> 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中 的线程数不会超过corePoolSize。
2> 拒绝策略不会生效。

适用场景
FixedThreadPool适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场 景,它适用于负载比较重的服务器。

SingleThreadExecutor详解

SingleThreadExecutor是使用单个worker线程的Executor。下面是SingleThreadExecutor的源代码实现:

public static ExecutorService newSingleThreadExecutor() { 
    return new FinalizableDelegatedExecutorService (
                     new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, 
                                                      new LinkedBlockingQueue<Runnable>())
); }

SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。其他参数与 FixedThreadPool相同。运行示意图如下:


SingleThreadExecutor的execute()的运行示意图.png

适用场景
SingleThreadExecutor适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多 个线程是活动的应用场景。

CachedThreadPool详解

CachedThreadPool是一个会根据需要创建新线程的线程池。下面是创建CachedThread- Pool的源代码:

public static ExecutorService newCachedThreadPool() { 
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 
                                                   60L, TimeUnit.SECONDS, new 
                                                   SynchronousQueue<Runnable>()); 
}

CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为 Integer.MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60L,意味着 CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。

CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但 CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于 maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下, CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

运行示意图如下:


CachedThreadPool的execute()的运行示意图.png

1)首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程 正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方 法执行完成;否则执行下面的步骤2。
2)当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤1将失 败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成。
3)在步骤2中新创建的线程将任务执行完后,会执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于 空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。

适用场景
SingleThreadExecutor适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多 个线程是活动的应用场景。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运 行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但 ScheduledThreadPoolExecutor功能更强大、更灵活。Timer对应的是单个后台线程,而 ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。


ScheduledThreadPoolExecutor的任务传递示意图.png

ScheduledThreadPoolExecutor的执行主要分为两大部分。
1)当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWith- FixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了 RunnableScheduledFutur接口的ScheduledFutureTask。

2)线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。

ScheduledThreadPoolExecutor的任务执行步骤.png

下面是对这4个步骤的说明:
1)线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time大于等于当前时间。
2)线程1执行这个ScheduledFutureTask。
3)线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
4)线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。

ScheduledThreadPoolExecutor

适用场景
ScheduledThreadPoolExecutor适用于需要多个后台线程执行周期任务,同时为了满足资源 管理的需求而需要限制后台线程的数量的应用场景。

SingleThreadScheduledExecutor

适用场景
SingleThreadScheduledExecutor适用于需要单个后台线程执行周期任务,同时需要保证顺 序地执行各个任务的应用场景。

FutureTask

FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给 Executor执行,也可以由调用线程直接执行FutureTask.run()。

根据FutureTask.run()方法被执行 的时机,FutureTask可以处于下面3种状态,如下图所示:


FutureTask的状态迁移示意图.png

1)未启动
当创建一 个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。

2)已启动
FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。

3)已完成
FutureTask.run()方法执行完后正常结束,或被取消,或 执行时抛出异常而异常结束,FutureTask处于已完成状态。

FutureTask调用get方法和cancel方法的执行示意图:


FutureTask的get和cancel的执行示意图.png
FutureTask实现

FutureTask的实现基于AbstractQueuedSynchronizer(以下简称为AQS),AQS是一个同步框架,它提供通 用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。

基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类 Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只 需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具 体来说,Sync实现了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通过这 两个方法来检查和更新同步状态。

FutureTask的设计示意图如下图所示:


FutureTask的设计示意图.png
  • get()/get(long timeout,TimeUnit unit)方法调用
    这个操作对应acquire,阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。其具体流程为:
    1> 调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实 现的tryAcquireShared()方法来判断acquire操作是否可以成功。acquire操作可以成功的条件为: state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null。
    2> 如果成功则get()方法立即返回。如果失败则到线程等待队列中去等待其他线程执行 release操作。
    3> 当其他线程执行release操作(比如FutureTask.run()或FutureTask.cancel(…))唤醒当前线 程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤 醒它的后继线程。
    4> 最后返回计算的结果或抛出异常。

  • run() 或 cancel()方法调用
    这个操作对应release,改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask.run()的执行过程如下:
    1> 执行在构造函数中指定的任务(Callable.call())。
    2> 以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect,int update),设置 state为执行完成状态RAN。如果这个原子操作成功,就设置代表计算结果的变量result的值为 Callable.call()的返回值,然后调用AQS.releaseShared(int arg)。
    3> AQS.releaseShared(int arg)首先会回调在子类Sync中实现的tryReleaseShared(arg)来执 行release操作(设置运行任务的线程runner为null,然会返回true);AQS.releaseShared(int arg), 然后唤醒线程等待队列中的第一个线程。
    4> 调用FutureTask.done()。

参考资料:

[1] Java并发编程的艺术,方腾飞,魏鹏,程晓明
[2] Android开发进阶--从小工到专家,何红辉

相关文章

  • java线程池

    线程VS线程池 普通线程使用 创建线程池 执行任务 执行完毕,释放线程对象 线程池 创建线程池 拿线程池线程去执行...

  • java----线程池

    什么是线程池 为什么要使用线程池 线程池的处理逻辑 如何使用线程池 如何合理配置线程池的大小 结语 什么是线程池 ...

  • Java线程池的使用

    线程类型: 固定线程 cached线程 定时线程 固定线程池使用 cache线程池使用 定时调度线程池使用

  • Spring Boot之ThreadPoolTaskExecut

    初始化线程池 corePoolSize 线程池维护线程的最少数量keepAliveSeconds 线程池维护线程...

  • 线程池

    1.线程池简介 1.1 线程池的概念 线程池就是首先创建一些线程,它们的集合称为线程池。使用线程池可以很好地提高性...

  • 多线程juc线程池

    java_basic juc线程池 创建线程池 handler是线程池拒绝策略 排队策略 线程池状态 RUNNIN...

  • ThreadPoolExecutor线程池原理以及源码分析

    线程池流程: 线程池核心类:ThreadPoolExecutor:普通的线程池ScheduledThreadPoo...

  • 线程池

    线程池 [TOC] 线程池概述 什么是线程池 为什么使用线程池 线程池的优势第一:降低资源消耗。通过重复利用已创建...

  • java 线程池使用和详解

    线程池的使用 构造方法 corePoolSize:线程池维护线程的最少数量 maximumPoolSize:线程池...

  • 线程池

    JDK线程池 为什么要用线程池 线程池为什么这么设计 线程池原理 核心线程是否能被回收 如何回收空闲线程 Tomc...

网友评论

      本文标题:线程池

      本文链接:https://www.haomeiwen.com/subject/qqocfhtx.html