美文网首页
JAVA线程池

JAVA线程池

作者: quanCN | 来源:发表于2021-05-28 20:15 被阅读0次

简介

线程池,从字面含义来看,是指管理一组同构工作线程的资源池。线程池与工作队列(任务队列)密切相关的,如图


  • 工作队列(Work Queue)
    在工作队列中保存了所有等待执行的任务
  • 工作者线程(Worker Thread)
    工作者线程,从工作队列中获取一个任务,执行任务,然后返回线程池等待下一个任务
好处
  • 降低资源消耗
    通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  • 提高响应速度
    任务到达时,无需等待线程创建即可立即执行。
  • 提高线程的可管理性
    线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  • 减少资源抢占
    可以防止过多线程相互竞争资源而使程序耗尽内存或失败

总体设计

Java中的线程池核心实现类是ThreadPoolExecutor,ThreadPoolExecutor的UML类图如下


生命周期

为了解决Executor服务的生命周期问题,ExecutorServiceExecutor进行了扩展,添加了一些用于生命周期管理的方法,如下

public interface ExecutorService extends Executor {
    //关闭服务,会先完成以及提交的任务而不再接收新的任务
    void shutdown();
    //暴力关闭服务,尝试取消所有运行中的任务,并且不再启动队列中尚未开始的任务
    List<Runnable> shutdownNow();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
        
    //提交指定任务去执行
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    
    //执行给定的任务,返回所有个任务的结果
    <T> List<Future<T>> invokeAll(Collection<?extends Callable<T>> tasks) throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException;
    
    //执行给定的任务,返回其中一个任务的结果
    <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

ExecutorService的生命周期有5种状态:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED下图根据jdk8的源码,所画的生命周期流程图:

设置线程池的大小

线程池的理想大小取决于被提交任务的类型以及部署系统的特性。例如对于计算密集型任务线程池大小设置为N_{cpu}+1通常能实现更优的利用率,对于I/O密集型操作线程池的规模应该更大。也可以通过设置不同大小的的线程池运行程序观察CPU利用率的水平,从而找到更优的线程池大小。
当然CPU周期并不是唯一影响线程池大小的资源,还包括内存、文件句柄、套接字句柄和数据库连接等。

默认线程池

JAVA类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors中的静态工厂方法之一来创建一个线程池:

  • newFixedThreadPool
    创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这是线程池的规模将不再变化(如果某个线程由于未预期的Exception而结束,那么线程池会补充一个新的线程)
  • newCachedThreadPool
    创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制
  • newSingleThreadExecutor
    单一线程池,它创建单个工作者来执行任务,如果线程异常结束,会创建另一个线程来替代。同时能够确保任务在队列中的顺序来串行执行(FIFO、LIFO、优先级)
  • newScheduledThreadPool
    定时线程池,它会创建一个固定长度的线程池,而且以延迟或定时的方式来执行任务,类似于Timer

配置ThreadPoolExecutor

ThreadPoolExecutor是一个灵活的、稳定的线程池,允许进行各种定制。如果默认的执行策略不能满足需求,那么可以通过ThreadPoolExecutor的构造函数来实例化一个对象,并且根据自己 的需求来定制,构造函数如下(7大参数):

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                              TimeUnit unit, BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory, RejectedExecutionHandler handler){...}
  • corePoolSize 基本大小
    表示线程池的目标大小,即在没有任务执行时线程池的大小,并且只有工作队列满了的情况才会创建超过这个数量的线程

  • maximumPoolSize 最大大小
    表示同时活动的线程数量的上限

  • keepAliveTime 存活时间
    如果某个空闲的线程池超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止

  • unit 时间单位
    参数keepAliveTime的时间单位,如秒、分钟、小时等

  • workQueue
    一个阻塞队列,用来存储等待执行的任务,一般有3种:

    • 无界队列
      LinkedBlockingQueue无界队列,newFixedThreadPool和newSingleThreadExecutor在默认情况下就是使用无界队列
    • 有界队列
      ArrayBlockingQueue有界队列,是一种更稳妥的资源管理策略,有助于减少内存使用量,降低CPU的使用率,同时还可以减少上下文切换,但是付出的代价是可能会限制吞吐量
    • 同步移交
      SynchronousQueue,SynchronousQueue不是一个真正的队列,而是一直在线程之间移交的机制。
      例如,要将一个元素放到其中,必须有另一个线程正在等待接受这个新的元素,如果没有线程正常等待,并且线程池的当前大小小于最大值,那么将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝,newCachedThreadPool就是使用这种队列

    当然也可以使用其他队列,例如PriorityBlockingQueue等
    :只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么就有可能导致线程“饥饿”死锁问题,此时应该使用newScheduledThreadPool

  • threadFactory
    线程工厂,主要用来创建线程。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂,可以定制线程池的配置信息

  • handler
    当有界工作队列被填满后,饱和策略开始发挥作用,JDK提供了4种策略:

    • ThreadPoolExecutor.AbortPolicy
      中止(Abort),为默认饱和策略,该策略会抛弃任务并抛出RejectedExecutionException异常
    • ThreadPoolExecutor.DiscardPolicy
      抛弃(Discard),抛弃任务,但是不抛出异常
    • ThreadPoolExecutor.DiscardOldestPolicy
      抛弃最旧的(Discard-Oldest),抛弃队列中队头的任务,然后重新尝试提交新的任务
    • ThreadPoolExecutor.CallerRunsPolicy
      调用者运行(Caller-Runs),在调用execute的线程中执行该任务

源码解读

execute()方法是Executor中定义的方法,是线程池的核心方法,ThreadPoolExecutor的实现如下

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

可以看出大致分为三步

  1. 如果当前线程数小于corePoolSize,将创建一个新线程,并将command做为第一个任务。调用addWorker()方法原子性地检查运行状态和工作者线程数量,返回false来防止错误警报,该错误警报在不应该添加线程的情况下会增加线程。
  2. 如果一个任务成功入队,仍需要再次检查是否应该添加一个线程(因为可能距离上次检查有的线程已经死亡)或者线程池已经关闭。因此,重新检查状态后,工作者线程执行任务或者拒绝任务并且触发饱和策略
  3. 如果不能将任务入队,则尝试添加一个新线程。如果失败了,则代表已经停止或饱和了,所以拒绝这个任务,并且触发饱和策略

相关文章

网友评论

      本文标题:JAVA线程池

      本文链接:https://www.haomeiwen.com/subject/ptqesltx.html