Java 线程池剖析

作者: data4 | 来源:发表于2017-02-09 22:58 被阅读219次

    概念

    线程 & 线程池

    • 什么是线程?

    提到线程,不得不提进程。进程是操作系统的最小资源单位,一个进程能使用多少虚拟内存,能打开多少文件标识符。线程是操作系统的最小调度单元,每个进程默认有一个主线程,它用来执行真正的任务。

    • 在什么情况下,需要使用线程?

    多个任务并发执行,提高 CPU 利用率,响应速度,执行效率。例如:主线程负责接收请求,工作线程负责处理请求。

    • 在什么情况下使用线程池?

    当你需要使用多个线程来执行相同类型任务时,你不希望自己来管理线程的创建和销毁,维护任务队列,分配任务,管理定时任务。ok,交给线程池吧,你只需要向线程池提交任务就好了。

    Java 线程池体系

    Java 库提供了一套完整的线程池解决方案,支持多种定制化配置,满足你的需求。java.util.concurrent 库与线程池相关的接口,类的继承关系如下所示:


    Executors

    接口

    Executor

    支持提交任务

    voidexecute(Runnablecommand) #提交 Runnable 任务
    

    ExecutorService

    继承 Executor,支持提交异步任务和关闭线程池。

    <T>Future<T>submit(Runnabletask, Tresult)
    Future<?>submit(Runnabletask)
    void shutdown()
    

    ScheduledExecutorService

    继承 ExecutorService, 增加支持定时任务和延迟任务

    ScheduledFuture<?> schedule(
      Runnable command, 
      long delay, 
      TimeUnit unit)
    
    <V> ScheduledFuture<V> schedule(
      Callable<V> callable, 
      long delay, 
      TimeUnit unit)
    
    ScheduledFuture<?> scheduleAtFixedRate(
      Runnable command, 
      long initialDelay, 
      long period, 
      TimeUnit unit)
    
    ScheduledFuture<?> scheduleWithFixedDelay(
      Runnable command, 
      long initialDelay, 
      long delay, 
      TimeUnit unit)
    

    实现类

    ThreadPoolExecutor

    ThreadPoolExecutor(int corePoolSize, 
      int maximumPoolSize, 
      long keepAliveTime, 
      TimeUnit unit, 
      BlockingQueue<Runnable> workQueue, 
      ThreadFactory threadFactory, 
      RejectedExecutionHandler handler)
    

    实现 ExecutorService 接口功能外,提供了以下几个配置项:

    • 线程池大小

    corePoolSize 线程池保留线程个数,maximumPoolSize 线程池最大线程个数。
    默认情况下,当提交任务时,才会创建线程。当提交任务时,如果当前 running 线程数小于 corePoolSize 的话,则创建一个线程;如果当前 running 线程数大于 corePoolSize 小于 maximumPoolSize 的话,当且仅当任务队列满的情况下,才会创建一个线程;

    • 线程工厂(threadFactory, 用来创建线程)

    默认情况下,使用 Executors.defaultThreadFactory 来创建线程,它们同属于一个线程组,有相同的优先级,non-daemon 状态。

    • 线程空闲多久销毁(keepAliveTime)

    当线程个数超过 corePoolSize 之后,如果有线程空闲时间超过 keepAliveTime 的话,线程池就会释放该线程。

    • 任务队列(workQueue)

    用来保存未提交和已提交的任务。任务队列的使用情况与线程池内部线程个数有关,如果线程个数小于 corePoolSize 的话,则不会缓存任务,而是创建线程执行任务;如果线程个数大于 corePoolSize 的话,则先缓存任务;如果队列满了,如果线程个数小于 maximumPoolSize 的话,则创建新线程;否则拒绝该任务。
    目前提供 3 种队列:(1)Direct handoffs (2)Unbounded queues 默认(3) Bounded queues。详细参考:ThreadPoolExector

    • 当任务队列满了之后的处理策略

    ThreadPoolExecutor.AbortPolicy 抛出异常 RejectedExecutionException(默认)
    ThreadPoolExecutor.DiscardPolicy 直接丢掉
    ThreadPoolExecutor.DiscardOldestPolicy 丢掉队列中最老的任务

    ScheduledThreadPoolExecutor

    继承 ThreadPoolExecutor,并实现 ScheduledExecutorService 接口行为。

    工具接口和类

    ThreadFactory

    接口,定义创建线程

    Thread newThread(Runnable r)
    

    Executors

    用来创建 ExecutorService, ScheduledExecutorService, ThreadFactory, Callable。

    public static <T> Callable<T> 
      callable(
        Runnable task, 
        T result)
    
    public static ThreadFactory 
      defaultThreadFactory()
    
    public static ExecutorService 
      newCachedThreadPool(
        ThreadFactory threadFactory)
    
    public static ExecutorService 
      newFixedThreadPool(
        int nThreads, 
        ThreadFactory threadFactory)
    

    最佳实践

    • 使用 Executors 工具类创建 ThreadPoolExecutor
    Executors.newFixedThreadPool(5);
    Executors.newScheduledThreadPool(3);
    
    • 自定义 ThreadFactory,定义 Thread 的名称和设置 Thread 的 daemon 状态。
    new ThreadFactory() {
        public Thread newThread(Runnable r) {
            Thread t = Executors
              .defaultThreadFactory()
              .newThread(r);
            t.setName("myname");
            t.setDaemon(true);
            return t;
        }
    }
    
    • 可以设置有固定大小的队列

    遗憾的是,Executors 并没有提供支持定制化队列的接口,所以只能使用 ThreadPoolExecutor。

    BlockingQueue<Runnable> queue = 
      new ArrayBlockingQueue<>(100);
    executorService = new ThreadPoolExecutor(
      n, 
      n, 
      0L, 
      TimeUnit.MILLISECONDS, 
      queue);
    
    • 记住优雅的关闭 ThreadPoolExecutor

    shutdown 不接受新的任务,保证已经接受的任务提交执行,但是并不保证任务执行结束
    awaitTermination 保证已经执行的任务完整执行结束。
    shutdownNow 关闭正在执行的任务,但不保证正在执行的任务正常关闭。未执行的任务直接返回。

    cxnResetExecutor.shutdown();
    try {
      if (cxnResetExecutor.awaitTermination(
        5, TimeUnit.SECONDS)) {
          cxnResetExecutor.shutdownNow();
      }
    } catch (Exception ex) {
      logger.error("Interrupted while waiting for connection reset executor " +
        "to shut down");
    }
    
    • 监控 ThreadPoolExecutor 状态
    public long getCompletedTaskCount() #获取已经完成的 task 总个数
    public int getActiveCount() #获取正在活的线程个数
    public int getLargestPoolSize() #获取线程池曾经最大的线程数
    public BlockingQueue<Runnable> getQueue()   #获取任务队列中任务个数
    

    参考

    Executors
    Executor
    ThreadPoolExecutor
    ExecutorService-10个要诀和技巧
    flume 源码
    Java线程池分析

    欢迎大家访问本人网站 程序员工具箱 致力于贡献便捷的工具,帮助程序员写出更优秀的代码,同时也欢迎大家一起来贡献。

    相关文章

      网友评论

        本文标题:Java 线程池剖析

        本文链接:https://www.haomeiwen.com/subject/uxgbittx.html