Java线程池

作者: 齐晋 | 来源:发表于2017-11-27 13:56 被阅读10次

    线程池的作用

    暂且不表

    线程池

    java提供的线程池类是ThreadPoolExecutor
    下图是类ThreadPoolExecutor的继承关系

    ThreadPoolExcecutor继承关系

    使用线程池,我们一般关心以下几个场景:

    • 为了控制线程数量,需要固定线程池大小
    • 希望压力小时线程少点,压力大时多创建点线程。压力再次减小时,多余的线程过一段时间自动销毁,以节省资源
    • 当请求过多时,多余的请求放入什么样的等待队列中。
    • 当请求太多,连队列都放不下的时候,应采取什么样的抛弃策略。

    鉴于以上场景,ThreadPoolExecutor有4个构造函数,能够满足所有的需求:

    public class ThreadPoolExecutor extends AbstractExecutorService {
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue);
     
        public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
                BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
    

    我们需要先看看构造函数的参数都是什么意思:

    int corePoolSize: 线程池中alive线程的最少数量。

    默认情况下,创建线程池后,线程池中是没有任何线程的。线程的创建延迟到了请求到达的时候。也就是说,一开始线程池中线程数量是0,来一个请求就创建一个线程。当线程数量达到corePoolSize时,线程池中线程的数量最少为corePoolSize

    int maximumPoolSize: 线程池中最大线程数量。

    在maximumPoolSize>corePoolSize的情况下,线程池中线程数量可超过corePoolSize,以应对过大的压力。当压力降下来的时候,一部分线程过一段时间会自动销毁,直至数量减少到corePoolSize。如果maximumPoolSize=corePoolSize,那么线程池的大小就是固定的了。
    Q: 线程数什么时候才会超过corePoolSize呢?
    A: 多余的请求会先进入等待队列,当等待队列满了的时候,会创建新的线程来处理请求(前提是maximumPoolSize>corePoolSize),线程数最大为maximumPoolSize。当还有更多请求时,就要采取抛弃策略了。
    Q: 那岂不是新请求会被先执行?

    long keepAliveTimeTimeUnit unit: 线程存活时间

    maximumPoolSize中提到线程数量可以超过corePoolSize。这些额外的线程过多场时间销毁呢?就是由keepAliveTime和unit决定的。keepAliveTime是个数字,unit表示时间。TimeUnit的可选值有:

    • TimeUnit.DAYS; //天
    • TimeUnit.HOURS; //小时
    • TimeUnit.MINUTES; //分钟
    • TimeUnit.SECONDS; //秒
    • TimeUnit.MILLISECONDS; //毫秒
    • TimeUnit.MICROSECONDS; //微妙
    • TimeUnit.NANOSECONDS; //纳秒

    如keepAliveTime=1,unit为TimeUnit.MINUTES,表示多余的线程过1分钟后销毁。

    BlockingQueue<Runnable> workQueue:等待队列,存放等待的请求。当线程数达到corePoolSize时,再来的请求会放入等待队列

    • ArrayBlockingQueue: 基于数组的先进先出队列,此队列创建时必须指定大小;
    • LinkedBlockingQueue: 基于链表的先进先出队列,可以指定大小。如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
    • SynchronousQueue: 这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

    RejectedExecutionHandler handler: 线程池中线程不够用,队列也放不下了,采取什么样的策略处理新请求。

    • ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
    • ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
    • ThreadPoolExecutor.DiscardOldestPolicy:丢弃线程队列里最近的一个任务,执行新提交的任务
    • ThreadPoolExecutor.CallerRunsPolicy:用调用者的线程来运行任务

    根据实际需求,设置好以上这些参数,就能创建出一个可用的线程池。

    除了上面的两个构造函数,ThreadPoolExecutor还提供了支持ThreadFactory创建线程的构造函数。如下:

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
            
         public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
             BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
    }
    

    ThreadFactory的作用......

    有了ThreadPoolExecutor,只需设置一些参数就可以拥有一个线程池,是不是很简单!是!但是还可以更简单。

    程序员一直都有懒的天性。因为懒,所以才创造出了各种工具、各种语言。秉承着懒懒更健康的原则,对于常见线程池,JDK提供了创建工具Executors

    Executors

    Executors是一个创建线程池的工具类。它提供了最常见的线程池的创建方法。不需要我们再苦思冥想设置参数了,只需选择合适的函数,一个最通用的线程池就创建了。

    Executors主要提供了以下四种工具:

    • newFixedThreadPool(int nThreads): 创建线程数量固定的线程池。
    • newSingleThreadExecutor(): 创建只有一个线程的线程池。
    • newCachedThreadPool(): 创建不限制线程数量的线程池。
    • newScheduledThreadPool(int corePoolSize): 创建一个定时调度的线程池。

    源码如下:

    public class Executors {
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
        
        public static ExecutorService newSingleThreadExecutor() {
         return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
        
        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
        
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize){
        new ScheduledThreadPoolExecutor(corePoolSize);
        }
    }
    

    通过源码,我们可以知道:

    • newFixedThreadPool(int nThreads)创建的线程池其实就是corePoolSize=maximumPoolSize,使用无界队列的线程池
    • newCachedThreadPool()创建的线程池其实是corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,线程闲置60s后自动销毁,同样使用无界队列的线程池。

    抛弃策略是啥?
    有人可能发现工具类中没有提供抛弃策略的参数。其实是使用了默认的抛弃策略:

    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();

    Executors中还有其他创建线程池的方法,可自行查阅,选取适合自己需求的使用。

    线程池的常用操作

    Future<?> submit(Runnable task);

    向线程池中提交一个Runnable类,返回一个Future对象。当调用get()方法时,会阻塞。线程结束时,返回null。

    <T> Future<T> submit(Runnable task, T result);

    向线程池中提交一个Runnable类,返回一个Future对象。当调用get()方法时,会阻塞。线程结束时,返回传入的result。

    <T> Future<T> submit(Callable<T> task);

    向线程池中提交一个Callable类,返回一个Future对象。当调用get()方法时,会阻塞。线程结束时,返回执行结果。

    void shutdown();

    关闭线程。已经submit的会继续执行直至结束,不会再接收新的任务

    List<Runnable> shutdownNow();

    如果shutdown()时,有线程在一直执行,不结束,总不能一直等吧。shutdownNow()会结束所有的线程,返回结果是所有等待任务。

    Executor & ExecutorService & Executors

    public interface Executor {
        void execute(Runnable command);
    }
    
    public interface ExecutorService extends Executor {
        void shutdown();
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
    }
    
    public interface ExecutorService extends Executor {
        void shutdown();
        <T> Future<T> submit(Callable<T> task);
        <T> Future<T> submit(Runnable task, T result);
        <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
    }
    
    • Executor 和 ExecutorService 这两个接口主要的区别是:ExecutorService 接口继承 Executor 接口,是 Executor 的子接口
    • Executor 和 ExecutorService 第二个区别是:Executor 接口定义了 execute()方法用来接收一个Runnable接口的对象,而 ExecutorService 接口中的 submit()方法可以接受Runnable和Callable接口的对象。
    • Executor 和 ExecutorService 接口第三个区别是 Executor 中的 execute() 方法不返回任何结果,而 ExecutorService 中的 submit()方法可以通过一个 Future 对象返回运算结果。
    • Executor 和 ExecutorService 接口第四个区别是除了允许客户端提交一个任务,ExecutorService 还提供用来控制线程池的方法。比如:调用 shutDown() 方法终止线程池。可以通过 《Java Concurrency in Practice》 一书了解更多关于关闭线程池和如何处理 pending 的任务的知识。
    • Executors 类提供工厂方法用来创建不同类型的线程池。比如: newSingleThreadExecutor() 创建一个只有一个线程的线程池,newFixedThreadPool(int numOfThreads)来创建固定线程数的线程池,newCachedThreadPool()可以根据需要创建新的线程,但如果已有线程是空闲的会重用已有线程。

    应用实例

    理论再多,不如看实际应用中的源码:
    metrics-core定时report源码:

    public abstract class ScheduledReporter{
        private final ScheduledExecutorService executor;
        protected ScheduledReporter(MetricRegistry registry,
                                    String name,
                                    MetricFilter filter,
                                    TimeUnit rateUnit,
                                    TimeUnit durationUnit) {
            this(registry, name, filter, rateUnit, durationUnit,
                    Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(name + '-' + FACTORY_ID.incrementAndGet())));
        }
    public void start(long period, TimeUnit unit) {
            executor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        report();
                    } catch (RuntimeException ex) {
                        LOG.error("RuntimeException thrown from {}#report. Exception was suppressed.", ScheduledReporter.this.getClass().getSimpleName(), ex);
                    }
                }
            }, period, period, unit);
        }
        
        public void stop() {
            executor.shutdown(); // Disable new tasks from being submitted
            try {
                // Wait a while for existing tasks to terminate
                if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                    executor.shutdownNow(); // Cancel currently executing tasks
                    // Wait a while for tasks to respond to being cancelled
                    if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
                        System.err.println(getClass().getSimpleName() + ": ScheduledExecutorService did not terminate");
                    }
                }
            } catch (InterruptedException ie) {
                // (Re-)Cancel if current thread also interrupted
                executor.shutdownNow();
                // Preserve interrupt status
                Thread.currentThread().interrupt();
            }
        }
    }
    

    参考

    相关文章

      网友评论

        本文标题:Java线程池

        本文链接:https://www.haomeiwen.com/subject/mkxdbxtx.html