美文网首页juc
[第二篇]深入学习线程池之通过ThreadPoolExecuto

[第二篇]深入学习线程池之通过ThreadPoolExecuto

作者: 秋慕云 | 来源:发表于2020-04-01 18:11 被阅读0次

    一、创建线程池的方式

    在实际的开发过程中,可以通过Executors类和ThreadPoolExecutor类来进行线程池的创建。但是,在《阿里巴巴java开发手册》中,不建议使用Executors类来创建线程池,原因之一是通过Executors类创建的四大线程池,可能会发生OOM问题,之二是通过Executors大家可以先做一下了解,本文后面会阐述产生OOM的原因。

    二、基于ThreadPoolExecutor类进行线程池的创建

    线程池的真正实现类是ThreadPoolExecutor,其构造方法有如下4种:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    1. 线程池的参数

    corePoolSize(必需):核心线程数。默认情况下,核心线程会一直存活,但是当将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。

    maximumPoolSize(必需):线程池所能容纳的最大线程数。当活跃线程数达到该数值后,后续的新任务将会阻塞。

    keepAliveTime(必需):线程闲置超时时长。如果超过该时长,非核心线程就会被回收。如果将allowCoreThreadTimeout设置为true时,核心线程也会超时回收。

    unit(必需):指定keepAliveTime参数的时间单位。常用的有:TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)、TimeUnit.MINUTES(分)。

    workQueue(必需):任务队列。通过线程池的execute()方法提交的Runnable对象将存储在该参数中。

    threadFactory(可选):线程工厂。用于指定为线程池创建新线程的方式。

    handler(可选):拒绝策略。当达到最大线程数时需要执行的饱和策略。

    2. 任务队列(workQueue)

    任务队列是基于阻塞队列实现的,即采用生产者消费者模式,在Java中需要实现BlockingQueue接口。但Java已经为我们提供了7种阻塞队列的实现:

    ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列(数组结构可配合指针实现一个环形队列)。

    LinkedBlockingQueue: 一个由链表结构组成的有界阻塞队列,在未指明容量时,容量默认为Integer.MAX_VALUE。

    PriorityBlockingQueue: 一个支持优先级排序的无界阻塞队列,对元素没有要求,可以实现Comparable接口也可以提供Comparator来对队列中的元素进行比较。跟时间没有任何关系,仅仅是按照优先级取任务。

    DelayQueue:类似于PriorityBlockingQueue,是二叉堆实现的无界优先级阻塞队列。要求元素都实现Delayed接口,通过执行时延从队列中提取任务,时间没到任务取不出来。

    SynchronousQueue: 一个不存储元素的阻塞队列,消费者线程调用take()方法的时候就会发生阻塞,直到有一个生产者线程生产了一个元素,消费者线程就可以拿到这个元素并返回;生产者线程调用put()方法的时候也会发生阻塞,直到有一个消费者线程消费了一个元素,生产者才会返回。

    LinkedBlockingDeque: 使用双向队列实现的有界双端阻塞队列。双端意味着可以像普通队列一样FIFO(先进先出),也可以像栈一样FILO(先进后出)。

    LinkedTransferQueue: 它是ConcurrentLinkedQueue、LinkedBlockingQueue和SynchronousQueue的结合体,但是把它用在ThreadPoolExecutor中,和LinkedBlockingQueue行为一致,但是是无界的阻塞队列。

    注意有界队列和无界队列的区别:如果使用有界队列,当队列饱和时并超过最大线程数时就会执行拒绝策略;而如果使用无界队列,因为任务队列永远都可以添加任务,所以设置maximumPoolSize没有任何意义。

    3. 线程工厂(threadFactory)

    线程工厂指定创建线程的方式,需要实现ThreadFactory接口,并实现newThread(Runnable r)方法。该参数可以不用指定,Executors框架已经为我们实现了一个默认的线程工厂,如下图:

    /**
     * The default thread factory.
     */
    private static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
    

    在实际的开发中可以是用Google提供的guava去创建ThreadFactory,相关信息如下:

    maven依赖:
    <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>23.0</version>
    </dependency>
    
    使用demo:
    ThreadFactory nameFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();
    

    4. 拒绝策略(handler)

    当线程池的线程数达到最大线程数时,需要执行拒绝策略。拒绝策略需要实现RejectedExecutionHandler接口,并实现rejectedExecution(Runnable r, ThreadPoolExecutor executor)方法。不过Executors框架已经为我们实现了4种拒绝策略:

    AbortPolicy(默认):丢弃任务并抛出RejectedExecutionException异常。

    CallerRunsPolicy:由调用线程处理该任务。

    DiscardPolicy:丢弃任务,但是不抛出异常。可以配合这种模式进行自定义的处理方式。

    DiscardOldestPolicy:丢弃队列最早的未处理任务,然后重新尝试执行任务。

    三、线程池的工作原理

    1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意:执行这一步骤需要获取全局锁)。

    2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。

    3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意:执行这一步骤需要获取全局锁)。

    4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用
    RejectedExecutionHandler.rejectedExecution()方法。

    ThreadPoolExecutor采取上述步骤的总体设计思路,一是为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈),二是创建和回收线程比较消耗资源。
    在ThreadPoolExecutor当前运行的线程数大于等于corePoolSize时,几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。

    线程池工作原理图

    相关文章

      网友评论

        本文标题:[第二篇]深入学习线程池之通过ThreadPoolExecuto

        本文链接:https://www.haomeiwen.com/subject/zehalctx.html