Java并发 之 线程池系列 (2) 使用ThreadPoolE

作者: 西召 | 来源:发表于2019-04-01 21:44 被阅读0次
    threadpoolexecutor-example.jpg

    Executors的“罪与罚”

    在上一篇文章Java并发 之 线程池系列 (1) 让多线程不再坑爹的线程池中,我们介绍了使用JDK concurrent包下的工厂和工具类Executors来创建线程池常用的几种方法:

    //创建固定线程数量的线程池
    ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);
    
    //创建一个线程池,该线程池会根据需要创建新的线程,但如果之前创建的线程可以使用,会重用之前创建的线程
    ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
    
    //创建一个只有一个线程的线程池
    ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
    

    诚然,这种创建线程池的方法非常简单和方便。但仔细阅读源码,却把我吓了一条: 这是要老子的命啊!

    我们前面讲过,如果有新的请求过来,在线程池中会创建新的线程处理这些任务,一直创建到线程池的最大容量(Max Size)为止;超出线程池的最大容量的Tasks,会被放入阻塞队列(Blocking
    Queue)进行等待,知道有线程资源释放出来为止;要知道的是,阻塞队列也是有最大容量的,多余队列最大容量的请求不光没有获得执行的机会,连排队的资格都没有!

    那这些连排队的资格都没有的Tasks怎么处理呢?不要急,后面在介绍ThreadPoolExecutor的拒绝处理策略(Handler Policies for Rejected Task)的时候会详细介绍。

    说到这里你也许有写疑惑了,上面这些东西,我通常使用Executors的时候没有指定过啊。是的,因为Executors很“聪明”地帮我们做了这些事情。

    Executors的源码

    我们看下ExecutorsnewFixedThreadPoolnewSingleThreadExecutor方法的源码:

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(
            nThreads, nThreads,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());
    }
    
    public static ExecutorService newSingleThreadExecutor() {
         return new FinalizableDelegatedExecutorService
             (new ThreadPoolExecutor(
             1, 1,
             0L, TimeUnit.MILLISECONDS,
             new LinkedBlockingQueue<Runnable>()));
     }
    

    其实它们底层还是通过ThreadPoolExecutor来创建ExecutorService的,这里对妻子的参数先不作介绍,下面会详细讲,这里只说一下new LinkedBlockingQueue<Runnable>()这个参数。

    LinkedBlockingQueue就是当任务数大于线程池的线程数的时候的阻塞队列,这里使用的是无参构造,我们再看一下构造函数:

    /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    

    我们看到阻塞队列的默认大小竟然是Integer.MAX_VALUE

    如果不做控制,拼命地往阻塞队列里放Task,分分钟“Out of Memory”啊!

    还有更绝的,newCachedThreadPool方法:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(
        0, Integer.MAX_VALUE,
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());
    }
    

    最大线程数默认也是Integer.MAX_VALUE,也就是说,如果之前的任务没有执行完就有新的任务进来了,就会继续创建新的线程,指导创建到Integer.MAX_VALUE为止。

    让你的JVM OutOfMemoryError

    下面提供一个使用newCachedThreadPool创建大量线程处理Tasks,最终OutOfMemoryError的例子。

    友情提醒:场面过于血腥,请勿在生产环境使用。

    package net.ijiangtao.tech.concurrent.jsd.threadpool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ThreadPoolExample2 {
    
        private static final ExecutorService executorService = Executors.newCachedThreadPool();
    
        private static class Task implements Runnable {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000 * 600);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private static void newCachedThreadPoolTesterBadly() {
            System.out.println("begin............");
            for (int i = 0; i <= Integer.MAX_VALUE; i++) {
                executorService.execute(new Task());
            }
            System.out.println("end.");
        }
    
        public static void main(String[] args) {
            newCachedThreadPoolTesterBadly();
        }
    
    }
    

    main方法启动以后,打开控制面板,看到CPU和内存几乎已经全部耗尽:

    [图片上传失败...(image-240e8d-1554126159952)]

    很快控制台就抛出了java.lang.OutOfMemoryError

    begin............
    Exception in thread "main" java.lang.OutOfMemoryError: unable to create new native thread
        at java.lang.Thread.start0(Native Method)
        at java.lang.Thread.start(Thread.java:717)
        at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:957)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1378)
        at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.newCachedThreadPoolTesterBadly(ThreadPoolExample2.java:24)
        at net.ijiangtao.tech.concurrent.jsd.threadpool.ThreadPoolExample2.main(ThreadPoolExample2.java:30)
    

    阿里巴巴Java开发手册

    下面我们在看Java开发手册这条规定,应该就明白作者的良苦用心了吧。

    【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。
    说明:
    Executors返回的线程池对象的弊端如下:
    1)FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
    2)CachedThreadPool和ScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。

    主角出场

    解铃还须系铃人,其实避免这个OutOfMemoryError风险的钥匙就藏在Executors的源码里,那就是自己直接使用ThreadPoolExecutor

    ThreadPoolExecutor的构造

    构造一个ThreadPoolExecutor需要蛮多参数的。下面是ThreadPoolExecutor的构造函数。

    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    下面就一一介绍一下这些参数的具体含义。

    ThreadPoolExecutor构造参数说明

    其实从源码中的JavaDoc已经可以很清晰地明白这些参数的含义了,下面照顾懒得看英文的同学,再解释一下:

    • corePoolSize

    线程池核心线程数。

    默认情况下核心线程会一直存活,即使处于闲置状态也不会受存keepAliveTime限制,除非将allowCoreThreadTimeOut设置为true

    • maximumPoolSize

    线程池所能容纳的最大线程数。超过maximumPoolSize的线程将被阻塞。

    最大线程数maximumPoolSize不能小于corePoolSize

    • keepAliveTime

    非核心线程的闲置超时时间。

    超过这个时间非核心线程就会被回收。

    • TimeUnit

    keepAliveTime的时间单位,如TimeUnit.SECONDS。

    当将allowCoreThreadTimeOut为true时,对corePoolSize生效。

    • workQueue

    线程池中的任务队列。

    没有获得线程资源的任务将会被放入workQueue,等待线程资源被释放。如果放入workQueue的任务数大于workQueue的容量,将由RejectedExecutionHandler的拒绝策略进行处理。

    常用的有三种队列:
    SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue

    • threadFactory

    提供创建新线程功能的线程工厂。

    ThreadFactory是一个接口,只有一个newThread方法:

    Thread newThread(Runnable r);
    
    • rejectedExecutionHandler

    无法被线程池处理的任务的处理器。

    一般是因为任务数超出了workQueue的容量。

    当一个任务被加入线程池时

    总结一下,当一个任务通过execute(Runnable)方法添加到线程池时:

    1. 如果此时线程池中线程的数量小于corePoolSize,即使线程池中的线程都处于空闲状态,也要创建新的线程来处理被添加的任务。

    2. 如果此时线程池中的数量等于corePoolSize,但是缓冲队列workQueue未满,那么任务被放入缓冲队列。

    3. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量小于maximumPoolSize,建新的线程来处理被添加的任务。

    4. 如果此时线程池中的数量大于corePoolSize,缓冲队列workQueue满,并且线程池中的数量等于maximumPoolSize,那么通过 handler所指定的拒绝策略来处理此任务。

    处理任务的优先级为:核心线程数(corePoolSize) > 任务队列容量(workQueue) > 最大线程数(maximumPoolSize);如果三者都满了,使用rejectedExecutionHandler处理被拒绝的任务。

    Thread_pool.png

    ThreadPoolExecutor的使用

    下面就通过一个简单的例子,使用ThreadPoolExecutor构造的线程池执行任务。

    ThreadPoolExample3

    package net.ijiangtao.tech.concurrent.jsd.threadpool;
    
    import java.time.LocalTime;
    import java.util.concurrent.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author ijiangtao.net
     */
    public class ThreadPoolExample3 {
    
        private static final AtomicInteger threadNumber = new AtomicInteger(1);
    
        private static class Task implements Runnable {
            @Override
            public void run() {
                try {
                    Thread.currentThread().sleep(2000);
                    System.out.println(Thread.currentThread().getName() + "-" + LocalTime.now());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
    
        private static class MyThreadFactory implements ThreadFactory {
    
            private final String namePrefix;
    
            public MyThreadFactory(String namePrefix) {
                this.namePrefix = namePrefix;
            }
    
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, namePrefix + "-" + threadNumber.getAndIncrement());
            }
    
        }
    
        private static final ExecutorService executorService = new ThreadPoolExecutor(
                10,
                20, 30, TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(50),
                new MyThreadFactory("MyThreadFromPool"),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String[] args) {
    
            // creates five tasks
            Task r1 = new Task();
            Task r2 = new Task();
            Task r3 = new Task();
            Task r4 = new Task();
            Task r5 = new Task();
    
            // submit方法有返回值
            Future future = executorService.submit(r1);
            System.out.println("r1 isDone ? " + future.isDone());
    
            // execute方法没有返回值
            executorService.execute(r2);
            executorService.execute(r3);
            executorService.execute(r4);
            executorService.execute(r5);
    
            //关闭线程池
            executorService.shutdown();
    
        }
    
    }
    

    执行结果

    r1 isDone ? false
    MyThreadFromPool-2-21:04:03.215
    MyThreadFromPool-5-21:04:03.215
    MyThreadFromPool-4-21:04:03.215
    MyThreadFromPool-3-21:04:03.215
    MyThreadFromPool-1-21:04:03.215
    

    从结果看,从线程池取出了5个线程,并发执行了5个任务。

    总结

    这一章我们介绍了一种更安全、更定制化的线程池构建方式:ThreadPoolExecutor。相信你以后不敢轻易使用Executors来构造线程池了。

    后面我们会介绍线程池的更多实现方式(例如使用Google核心库Guava),以及关于线程池的更多知识和实战。

    Links

    作者资源

    相关资源

    作者:涛哥 ( ijiangtao.net )
    公众号:西召 ( westcall )
    欢迎 评论、关注、打赏,转发和点赞,你的鼓励是我持续创作的动力。
    涛哥这里,干货和湿货,一应俱全!

    相关文章

      网友评论

        本文标题:Java并发 之 线程池系列 (2) 使用ThreadPoolE

        本文链接:https://www.haomeiwen.com/subject/moabbqtx.html