美文网首页
Java基础 -- 线程池

Java基础 -- 线程池

作者: tom_xin | 来源:发表于2019-09-28 09:16 被阅读0次

    为什么需要线程池

        在生产环境中,为每个任务都分配一个线程,这种方法存在一些缺陷,尤其是当需要创建大量的线程时:
        线程生命周期开销非常高:线程的创建与销毁都会需要JVM和操作系统参与提供一些辅助操作。在高并发的场景下,是非常消耗CPU计算资源的。
        资源消耗:活跃的线程是会占用系统内存资源的,大量的线程对系统内存的消耗非常严重。
        稳定性:线程数量在一定范围内是会提高系统资源利用率的,如果超出了这个限制,很可能会发生一些意向不到的问题,例如OOM。
        线程池的出现很好的解决了这些问题,它将提交任务执行任务分开(生产者-消费者模式),通过配置参数限制了线程池中最大可允许创建线程的数量。


    线程池的基本概念

    Integer corePoolSize: 线程池的最大空闲线程数,但变量allowCoreThreadTimeOut同样可以设置在一定时间内回收这些线程。
    Integer maximumPoolSize:线程池最大的活跃线程数。
    long keepAliveTime:当线程池内的线程数大于corePoolSize指定的线程数时,可以通过keepAliveTime来指定
    TimeUnit unit:keepAliveTime的时间单位(时,分,秒,毫秒)
    BlockingQueue<Runnable> workQueue:该队列用于暂存因为空闲线程导致没有执行的任务。
    ThreadFactory threadFactory:线程池中的线程是通过ThreadFactory创建的。
    RejectExecutionHandler handler:当BlockingQueue(阻塞队列)的暂存的任务数达到BlockingQueue容量的上限,并且此时暂无可用线程时,会执行该拒绝策略。
    Worker类:提交到线程池中的任务都会被封装为一个Worker对象,并分配一个线程。在执行任务时会调用Worker类的run()方法。Worker类实现了Runnable的run方法,继承了AbstractQueueSynchronizer类,重写了tryAcquire,tryRelease方法,控制线程在运行过程中的interrupt操作,提供了同步机制。


    线程池的执行流程

    image.png

    常见线程池的使用方式

    Executor框架

        FixThreadPool
        Executor框架提供了两个创建FixThreadPool线程池的方法,newFixThreadPool(int nThreads),newFixThreadPool(int nThreads,ThreadFactory threadFactory)。FixThreadPool限制了线程池中线程的数量。
        SingleThreadPool
        只有一个线程的线程池,提交到线程池中的任务会按照他们的提交顺序来执行。
        CachedThreadPool
        在程序执行过程中会创建与所需数量相同的线程,然后再它回收旧线程时停止创建新线程。
        ScheduledThreadPoolExecutor
        通过schedule(Runnable command, long delay, TimeUnit unit)方法,向ScheduledThreadPoolExecutor中提交任务时,可以设置延迟时间。设置延迟时间的任务会在一段时间后执行。

    使用线程池时需要注意哪些事项

    1. 合理设置核心线程数大小
          在设置线程池中线程的数量时,只要数量不是“过大”或者“过小”都可以,在设置完成后,需要充分的自测。在《Java并发编程实战》中给出了核心线程数的计算公式。
      线程池的最优大小 = CPU核数 * CPU的期望利用率 * (1 + 线程等待时间/线程执行时间)

      image.png
    2. 不同职责的线程池区分
           项目中使用线程池时需要根据自己的应用场景来选择合适的线程池。不过在阿里编写的《阿里巴巴Java开发手册》中建议尽量使用ThreadPoolExecutor来构造适合自己的线程池。

    3. 选择适当的拒绝策略
           我在另外一篇文章中有介绍 https://www.jianshu.com/p/5f70fbd70b84

    4. 避免使用ThreadLocal
          线程池中线程的生命周期通常会超过任务的生命周期,所以ThreadLocal中以Thread ID作为Key来存储缓存可能会出现问题。


    日常工作中的使用

    需求:统计线程池中的任务一段时间内请求外部接口的次数
    实现:可以利用ThreadPoolExecutor中的beforeExecute,afterExecute,terminated方法来实现这个要求。
    具体代码如下。

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;
    
    public class StatisticThreadFactoryPool extends ThreadPoolExecutor {
    
        private final static ThreadLocal<Long> threadLocal = new ThreadLocal<>();
    
        private final static List<StatisticDO> statisticDOS = new ArrayList<>(16);
    
    
        public StatisticThreadFactoryPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        public StatisticThreadFactoryPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        }
    
        public StatisticThreadFactoryPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        }
    
        public StatisticThreadFactoryPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        }
    
        /**
         * 在任务执行前,记录当前而任务执行的开始时间
         */
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            // 执行父类的方法
            super.beforeExecute(t, r);
            // 存储当前的时间
            threadLocal.set(System.nanoTime());
        }
    
        /**
         *  在任务结束后,将统计信息保存到StatisticDO对象中。
         *  记录任务的开始时间
         *  记录任务的结束时间
         *  计算执行时间
         *  保存当前任务的标识
         */
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            try {
                StatisticDO statisticDO = new StatisticDO();
                statisticDO.setStartTime(threadLocal.get());
                statisticDO.setEndTime(System.nanoTime());
                statisticDO.setRunTime(statisticDO.getEndTime() - statisticDO.getStartTime());
                statisticDO.setTask("thread:" + Thread.currentThread() + r.toString());
                statisticDOS.add(statisticDO);
            } finally {
                super.afterExecute(r, t);
            }
        }
    
        /**
         *  在线程池的任务结束后,将统计信息放入redis缓存中,方便后续调用查询。
         *  
         */
        @Override
        protected void terminated() {
            try {
                // 将statisticDOS中的数据写入缓存中:redis。方便后面查询统计使用
                threadLocal.remove();
            } finally {
                super.terminated();
            }
        }
    
        class StatisticDO {
            // 任务运行时间
            private long runTime;
            // 任务开始时间
            private long startTime;
            // 任务结束时间
            private long endTime;
            // 任务标识
            private String task;
    
    
            public long getRunTime() {
                return runTime;
            }
    
            public void setRunTime(long runTime) {
                this.runTime = runTime;
            }
    
            public long getStartTime() {
                return startTime;
            }
    
            public void setStartTime(long startTime) {
                this.startTime = startTime;
            }
    
            public long getEndTime() {
                return endTime;
            }
    
            public void setEndTime(long endTime) {
                this.endTime = endTime;
            }
    
            public String getTask() {
                return task;
            }
    
            public void setTask(String task) {
                this.task = task;
            }
        }
    }
    
    

    相关文章

      网友评论

          本文标题:Java基础 -- 线程池

          本文链接:https://www.haomeiwen.com/subject/zrggxqtx.html