美文网首页
Java线程池基础详解

Java线程池基础详解

作者: _灯火阑珊处 | 来源:发表于2020-06-19 15:11 被阅读0次

    为什么要使用线程池

    1. 反复创建线程开销大
    2. 过多的线程会占用太多内存

    线程池的好处

    1. 加快响应速度
    2. 合理利用CPU和内存
    3. 统一管理

    线程池适合应用的场合

    1. 服务器接受到大量请求时,使用线程池技术是非常合适的,它可以大大减少线程的创建和销毁次数,提高服务器的工作效率
    2. 在开发中,如果需要创建5哥以上的线程,那么就可以使用线程池来管理

    线程池构造函数的参数

    参数名 类型 含义
    corePoolSize int 核心线程数
    maximumPoolSize int 最大线程数
    keepAliveTime long 保持存活时间
    workQueue BlockingQueue 任务存储队列
    threadFactory ThreadFactory 当线程池需要新的线程的时候,会使用threadFactory来生成新的线程
    Handler RejectedExecutionHandler 由于线程池无法接受你所提交的任务的拒绝策略
    corePoolSize

    指的是核心线程数,线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时,再创建新线程去执行任务

    maximumPoolSize

    线程池有可能会在核心线程数的基础上,额外增加一些线程,但是这些新增加的线程数有一个上限,这就是maximumPoolSize

    keepAliveTime

    如果线程池当前的线程数多于corePoolSize,那么如果多余的线程空闲时间超过keepAliveTime,它们就会被终止

    ThreadFactory

    新的线程都是由ThreadFactory创建的,默认使用Executors.defaultThreadFactory,创建出来的线程都在同一个线程组,拥有同样的NORM_PRIORITY优先级并且都不是守护线程。如果自己指定ThreadFactory,那么就可以改变线程名、线程组、优先级、是否是守护线程等。通常使用默认的ThreadFactory就可以了。

    workQueue

    有3种最常见的队列类型
    1.直接交接:SynchronousQueue
    2.无界队列:LinkedBlockingQueue
    3.有界队列:ArrayBlockingQueue

    线程池添加线程规则

    1. 如果线程数小于corePoolSize,即使其他工作线程处于空闲状态,也会创建一个新线程来运行新任务
    2. 如果线程数等于(或大于)corePoolSize但少于maximumPoolSize,则将任务放入队列
    3. 如果队列已满,并且线程数小于maximumPoolSize,则创建一个新线程来运行任务
    4. 如果队列已满,并且线程数大于或等于maximumPoolSize,则拒绝该任务
    线程池添加线程规则

    增减线程的特点

    1. 通过设置corePoolSize和maximumPoolSize相同,就可以创建固定大小的线程池;
    2. 线程池希望保持较少的线程数,并且只有在负载变得很大时才增加它;
    3. 通过设置maximumPoolSize为很高的值(例如Integer.MAX_VALUE),可以允许线程池容纳任意数量的并发任务;
    4. 是只有在队列填满时才创建多于corePoolSize的线程,所以如果你使用的是无界队列(例如LinkedBlockingQueue),那么线程数就不会超过corePoolSize。

    线程池应该手动创建还是自动创建

    newFixedThreadPool:

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    • 通过源码可以看出,newFixedThreadPool 使用的是 LinkedBlockingQueue,由于 LinkedBlockingQueue 是没有容量上限的,所以当请求数越来越多,并且无法及时处理完毕的时候,也就是请求堆积的时候,会容易造成占用大量的内存,可能会导致OOM

    newSingleThreadExecutor:

        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
    • 通过源码可以看出,这里和上面的 newFixedThreadPool 的原理基本一样,只不过是把线程数直接设置成了1,所以这也会导致同样的问题,也就是当请求堆积的时候,可能会占用大量的内存。

    newCachedThreadPool:

        public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
    • 可缓存线程池
    • 特点:无界限线程池,具有自动回收多余线程的功能(默认时间是60秒)
    • 弊端:在于第二个参数 maximumPoolSize 被设置为了Integer.MAX_VALUE,这可能会创建数量非常多的线程,甚至导致OOM。

    newScheduledThreadPool:

    • 支持定时及周期性任务执行的线程池
    正确的创建线程池的方法

    根据不同的业务场景,选择合适的方式,最后是我们自己手动创建线程池,自己设置线程池参数。

    线程池里的线程数量设定为多少比较合适

    1. CPU密集型(加密、计算hash等):最佳线程数为CPU核心数的 1-2 倍左右
    2. 耗时IO型(读写数据库、文件、网络读写等):最佳线程数一般会大于CPU核心数很多倍,以JVM线程监控显示繁忙情况为依据,保证线程空闲可以衔接上,参考 Brain Goetz 推荐的计算方法:
      线程数=CPU核心数x(1+平均等待时间/平均工作时间)

    停止线程池的正确方法

    1. shutdown
      shutdown 并不是立即粗暴的结束线程,线程池仍然会继续执行已创建的任务,但是不会接收新的任务了。
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ShutDownDemo {
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 1000; i++) {
                executorService.execute(new ShutDownTask());
            }
            Thread.sleep(1500);
            executorService.shutdown();
            // 1500毫秒后此处会抛出异常 RejectedExecutionException
            executorService.execute(new ShutDownTask());
        }
        
    }
    
    class ShutDownTask implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(500);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    上述代码在1.5秒后执行 executorService.shutdown(); 之后,再执行
    executorService.execute(new ShutDownTask()); 则会报错抛出异常 RejectedExecutionException

    1. isShutdown
      返回true或false告诉我们线程是否已经停止
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 1000; i++) {
                executorService.execute(new ShutDownTask());
            }
            Thread.sleep(1500);
            System.out.println(executorService.isShutdown());  // 打印false
            executorService.shutdown();
            System.out.println(executorService.isShutdown());  // 打印true
    
            System.out.println(executorService.isTerminated());
    
        }
    
    1. isTerminated
      返回true或false告诉我们线程是否已经完全停止
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 1000; i++) {
                executorService.execute(new ShutDownTask());
            }
            Thread.sleep(1500);
            executorService.shutdown();
            // 打印false,因为线程还没有完全执行完
            System.out.println(executorService.isTerminated());
        }
    
    1. awaitTermination
      awaitTermination 有三种情况会返回,没返回之前都是阻塞
      第一种情况:所有任务都执行完毕了
      第二种情况:等待的时间到了
      第三种情况:等待的过程中被打断了,会抛出 InterruptedException
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 1000; i++) {
                executorService.execute(new ShutDownTask());
            }
            Thread.sleep(1500);
            boolean b = executorService.awaitTermination(3L, TimeUnit.SECONDS);
            // 会打印false,因为3秒钟不够线程全部执行完
            System.out.println(b);
        }
    
    1. shutdownNow
      正在执行任务的线程继续执行,等待队列中的线程直接结束并返回
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ShutDownDemo {
    
        public static void main(String[] args) throws InterruptedException {
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            for (int i = 0; i < 100; i++) {
                executorService.execute(new ShutDownTask());
            }
            // 等待1.5秒
            Thread.sleep(1500);
            // 这里会返回已经放到线程池队列中还没有执行的Runnable
            List<Runnable> runnables = executorService.shutdownNow();
        }
    
    }
    
    class ShutDownTask implements Runnable {
        @Override
        public void run() {
            try {
                Thread.sleep(500);
                System.out.println(Thread.currentThread().getName());
            } catch (InterruptedException e) {
                // 接收被中断信号
                System.out.println(Thread.currentThread().getName() + "被中断了!");
            }
        }
    }
    

    executorService.shutdownNow();会返回已经放到线程池队列中还没有执行的Runnable
    运行结果:

    ...
    ...
    ...
    pool-1-thread-8
    pool-1-thread-7
    pool-1-thread-6
    pool-1-thread-10
    pool-1-thread-1
    pool-1-thread-3
    pool-1-thread-1被中断了!
    pool-1-thread-4
    pool-1-thread-2
    pool-1-thread-7被中断了!
    pool-1-thread-8被中断了!
    pool-1-thread-10被中断了!
    pool-1-thread-6被中断了!
    pool-1-thread-9被中断了!
    pool-1-thread-5被中断了!
    
    Process finished with exit code 0
    

    线程池任务太多,怎么拒绝

    • 拒绝时机

    1. 当Executor关闭时,提交新任务会被拒绝
    2. 以及当Executor对最大线程和工作队列容量使用有限边界并且已经饱和时
    • 拒绝策略

    1. AbortPolicy,直接抛出一个异常
    2. DiscardPolicy,悄悄的把任务丢弃,没有通知
    3. DiscardOldestPolicy,把队列中最老的那个任务丢弃,新任务加进来
    4. CallerRunsPolicy,谁提交的任务谁去执行(比如说主线程给线程池提交了一个任务,但是线程池已经饱和无法再执行了,这时则会让提交任务的主线程去执行这个任务)

    线程池钩子函数

    import java.util.concurrent.*;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * 演示每个任务执行前后放钩子函数
     */
    public class PauseableThreadPool extends ThreadPoolExecutor {
    
        private boolean isPaused;
        private final ReentrantLock lock = new ReentrantLock();
        private Condition unpaused = lock.newCondition();
    
        public static void main(String[] args) throws InterruptedException {
            PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<>());
    
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("我被执行了");
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            };
    
            for (int i = 0; i < 10000; i++) {
                pauseableThreadPool.execute(runnable);
            }
    
            Thread.sleep(2000);
            // 等待2秒,执行暂停方法
            pauseableThreadPool.pause();
            System.out.println("线程池被暂停了!");
            Thread.sleep(2000);
            // 再等待2秒,执行恢复方法
            pauseableThreadPool.resume();
            System.out.println("线程池被恢复了!");
        }
    
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            lock.lock();
            try {
                while (isPaused) {
                    unpaused.await();
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 暂停线程
         */
        private void pause() {
            lock.lock();
            try {
                isPaused = true;
            } finally {
                lock.unlock();
            }
        }
    
        /**
         * 恢复线程
         */
        private void resume() {
            lock.lock();
            try {
                isPaused = false;
                unpaused.signalAll();
            } finally {
                lock.unlock();
            }
        }
    
        public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    }
    

    运行结果:


    钩子函数执行效果

    相关文章

      网友评论

          本文标题:Java线程池基础详解

          本文链接:https://www.haomeiwen.com/subject/oqrsxktx.html