美文网首页
java线程池(ThreadPoolExecuter)

java线程池(ThreadPoolExecuter)

作者: 倔强韭菜 | 来源:发表于2019-02-27 11:51 被阅读0次

    做事要有目标,看源码首先要确定需要追踪理解的问题,不然容易陷入逻辑泥潭中,理不清头绪。

    因此本分结构分两部分

    1. 常见问题
    2. 源码分析,任务如何被执行?

    不废话直奔主题(jdk 1.8.0 不同版本还是有不小差异)

    (一)常见问题

    1. 为什么要使用线程池,有啥好处?
    2. 创建线程池时,构造函数中几个参数的作用?
    3. Executors中几种线程池的差异?

    1. 为什么要使用线程池,有啥好处?

    1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
    2. 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
    3. 提高线程的可管理性。可以提供定时、定期、单线程、并发数控制等功能

    2. 创建线程池时,构造函数中几个参数的作用?

    线程池构造方法,但最终是调用这个方法。
    一共七个参数。

        public ThreadPoolExecutor(int corePoolSize,      //核心线程池大小
                                  int maximumPoolSize,   //线程池最大大小
                                  long keepAliveTime,    //线程空闲活动时间 
                                  TimeUnit unit,        //线程空闲活动时间单位
                                  BlockingQueue<Runnable> workQueue,  //任务队列
                                  ThreadFactory threadFactory,  //创建线程的工厂
                                  RejectedExecutionHandler handler)  //拒绝策略
    
    • corePoolSize(核心线程数量):核心线程会一直存活,即使没有任务需要执行。当线程数少于核心线程数时,有新任务需要执行,即使有空闲线程,线程池也会优先创建新线程而不是使用空闲线程处理。

    • workQueue(任务队列):当核心线程数达到最大时,新任务会放在队列中排队等待执行。可以选择以下几种阻塞队列:
    ArrayBlockingQueue // 是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
    LinkedBlockingQueue //一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
    SynchronousQueue //一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
    PriorityBlockingQueue //一个具有优先级的无限阻塞队列。
    
    • maximumPoolSize(最大线程数量):线程池允许线程创建的最大线程数。
      线程数>=corePoolSize,且任务队列已满时:线程池会创建新线程来处理任务
      。线程数=maxPoolSize,且任务队列已满时:线程池会使用拒绝策略处理任务

    • keepAliveTime(线程空闲活动时间)当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize;如果allowCoreThreadTimeout=true,则会直到线程数量=0。

    • threadFactory(创建线程的工厂):默认使用Executors内部类DefaultThreadFactory。(通过实现ThreadFactory接口,给每个创建出来的线程设置有意义的名字,排查线程问题时十分有帮助。阿里巴巴JAVA开发手册中也要求这样做,尽量不要直接使用Executors中的方法创建线程池。)

    • handler(拒绝策略):默认使用AbortPolicy。当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。可以选择以下几类拒绝策略:

    AbortPolicy:直接抛出异常。默认策略
    CallerRunsPolicy:使用调用者所在线程来运行任务
    DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务
    DiscardPolicy:不处理,丢弃掉
    

    当然也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。

    3.Executors中几种线程池的差异?

    • newFixedThreadPool
        /**
        * corePoolSize:nThreads
        * maximumPoolSize: nThreads
        * keepAliveTime:0
        * workQueue:LinkedBlockingQueue(容量:Integer.MAX_VALUE)
        * threadFactory:默认值
        * handler:默认值 
        */
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
    • newSingleThreadExecutor
        /**
        * corePoolSize:1
        * maximumPoolSize: 1
        * keepAliveTime:0
        * workQueue:LinkedBlockingQueue(容量:Integer.MAX_VALUE)
        * threadFactory:默认值
        * handler:默认值 
        */
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
    • newCachedThreadPool
        /**
        * corePoolSize:0
        * maximumPoolSize: Integer.MAX_VALUE
        * keepAliveTime:60s
        * workQueue:SynchronousQueue
        * threadFactory:默认值
        * handler:默认值 
        */
        public static ExecutorSersvice newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    (二)源码分析,任务如何被执行?

    线程池状态(5个)

    • RUNNING(-1):接受新任务,同时执行任务队列中的任务
    • SHUTDOWN(0):不接受新任务,但是仍然执行任务队列中的任务
    • STOP(1):不接受新任务,不执行任务队列中的任务,并且尝试终止正在执行的任务
    • TIDYING(2):所有工作线程已经销毁,任务缓存队列已经清空或执行结束。执行terminated()钩子方法
    • TERMINATED(3):terminated()方法执行完成

    重要接口

    • ThreadFactory(线程工厂)
      线程池通过此接口创建新线程
    public interface ThreadFactory {
        Thread newThread(Runnable r);
    }
    

    默认实现DefaultThreadFactory

       static class DefaultThreadFactory implements ThreadFactory {
            // 使用默认线程工厂的线程池数量
            private static final AtomicInteger poolNumber = new AtomicInteger(1);
            // 当前线程池中线程所属的线程组
            private final ThreadGroup group;
            // 当前线程池中累计创建的线程数量
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            // 当前线程池中线程名称前缀
            private final String namePrefix;
    
            DefaultThreadFactory() {
                // 声明安全管理器
                SecurityManager s = System.getSecurityManager();
                // 获取线程组
                group = (s != null) ? s.getThreadGroup() :
                                      Thread.currentThread().getThreadGroup();
                // 线程名前缀,例如 "pool-1-thread-"                      
                namePrefix = "pool-" +
                              poolNumber.getAndIncrement() +
                             "-thread-";
            }
    
            public Thread newThread(Runnable r) {
                Thread t = new Thread(group, r,
                                      namePrefix + threadNumber.getAndIncrement(),
                // 设置线程t为前台线程                     0);
                if (t.isDaemon())
                    t.setDaemon(false);
                 // 设置线程t的优先级为NORM_PRIORITY  
                if (t.getPriority() != Thread.NORM_PRIORITY)
                    t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
    
    • RejectedExecutionHandler(拒绝策略)
      指当任务添加到线程池中被拒绝,而采取的处理措施。
      可能原因:1.任务队列已满,线程数量达到最大限制;2.线程池关闭(SHUTDOWN);
    public interface RejectedExecutionHandler {
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    

    默认实现AbortPolicy

      public static class AbortPolicy implements RejectedExecutionHandler {
      
            public AbortPolicy() { }
            
            // 简单粗暴,直接抛出异常
            public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
                throw new RejectedExecutionException("Task " + r.toString() +
                                                     " rejected from " +
                                                     e.toString());
            }
        }
    

    重要成员变量

    //任务缓存队列,用来存放等待执行的任务
    private final BlockingQueue<Runnable> workQueue; 
    
    //线程池的主要状态锁,对线程池状态(比如线程池大小、runState等)的改变都要使用这个锁
    private final ReentrantLock mainLock = new ReentrantLock(); 
    
    // 线程集合,访问时必须持有 mainLock
    private final HashSet<Worker> workers = new HashSet<Worker>(); 
    

    重要方法

    • execute: 新任务是如何被添加的?
    1. 当线程数 < 核心线程数(corePoolSize): 不管有没有线程空闲,直接创建新线程执行任务,。
    2. 当线程数 >= 核心线程数,且任务队列未满: 将任务放入任务队列(workQueue)。
    3. 当线程数 >= 核心线程数,且任务队列已满:若线程数 < 最大线程数(maximumPoolSize),创建新线程执行任务;若线程数 = 最大线程数(maximumPoolSize),使用拒绝策略(RejectedExecutionHandler)处理任务。
        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
    
            // 当前线程数小于核心线程数量时,新建线程
            // 并将command作为首个任务开始执行
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            
            // 如果线程池处于RUNNING状态,并且任务添加
            // 到队列成功
            if (isRunning(c) && workQueue.offer(command)) {
                //方法执行过程中,线程池可能发成改变,
                // 需要double-check
                int recheck = ctl.get();
                // 线程池状态改变,非RUNNING,需要回滚
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                // 原有线程可能可能已经结束     
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            } 
            // 如果任务无法加入队列中,尝试新建非核心
            // 线程执行任务。如果新建失败,则使用拒绝
            // 策略处理任务
            else if (!addWorker(command, false))
                reject(command);
        }
    
    • runWorker: 新任务是如何被线程执行的?
      final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            // 创建线程时放入的首个任务
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
               // 循环, 从队列中取出等待执行的任务
               // 如果任务队列为空,超过keepAliveTime时长,
               // getTask()返回为空,此时线程就会销毁
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        // 任务执行开始前调用,子类可重写此方法
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            // 任务执行结束后调用,子类可重写此方法
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                // 线程销毁
                processWorkerExit(w, completedAbruptly);
            }
        }
    
    • getTask: 任务如何从队列中取出?
     private Runnable getTask() {
            boolean timedOut = false; 
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // 允许核心线程销毁或者线程数量大于核心线程数量
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                // 获取任务超时,线程数量减1,返回空
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    // 从阻塞队列中获取任务设定超时时间
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    相关文章

      网友评论

          本文标题:java线程池(ThreadPoolExecuter)

          本文链接:https://www.haomeiwen.com/subject/fcbxuqtx.html