美文网首页
并发编程(5)ThreadPoolExecutor原理解析

并发编程(5)ThreadPoolExecutor原理解析

作者: wustor | 来源:发表于2017-12-16 12:04 被阅读139次

    概述

    由于线程的创建跟销毁是比较消耗资源的,也是比较耗时的。可能为了程序的需要,我们会创建很多线程,所以很有必要对线程进行一个统一的管理,所以就出现了线程池。通过线程池,我们可以重复利用一些线程资源,同时可以统一管理应用内的线程,防止内存泄露。

    运行机制

    Executor

    当我们创建一个任务之后,放进线程池之后,线程池会做如下判断

    • 1.判断核心线程池里的线程是否都在执行任务:否的话则将新任务放入线程池中进行执行,否则进行下一步。
    • 2.判断缓存队列是否未满:是的话,将新任务放入缓存队列,否则进行下一步
    • 3.判断线程池的线程是否都处于工作状态:是的话就就执行线程抛弃策略,否则就执行当前任务

    继承关系

    ScheduledThreadPoolExecutor

    Executor 接口定义了线程池最基本的方法,提交Runnable 任务

    public interface Executor {
        void execute(Runnable command);
    }
    

    ExecutorService 扩充了提交任务的类型,并且定义了线程池关闭任务的方法。

    ExecutorService

    AbstractExecutorService 是抽象类,主要是对ExecutorService 的一些具体实现
    ThreadPoolExecutor 是最核心的一个类,下面会具体分析其源码。
    ScheduledThreadPoolExecutor则是在 在 ThreadPoolExecutor 的基础上增加了时间调度的功能

    成员变量

         //32-3=29,线程数量所占位数
        private static final int COUNT_BITS = Integer.SIZE – 3;    
        //低29位表示最大线程数,2的29次幂-1
        private static final int CAPACITY = (1 << COUNT_BITS) – 1;    
        //线程池自身的状态
        
        //符号位101
        private static final int RUNNING    = -1 << COUNT_BITS;
        //高3位000
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
         //高3位001
        private static final int STOP       =  1 << COUNT_BITS;
       //高3位010
        private static final int TIDYING    =  2 << COUNT_BITS;
         //高3位011
        private static final int TERMINATED =  3 << COUNT_BITS;
        //缓存队列,等待中的线程任务队列
        private final BlockingQueue<Runnable> workQueue;
        //线程池中工作的线程集合
        private final HashSet<Worker> workers = new HashSet<>();
        //最大线程数
        private int largestPoolSize;
        //完成任务的线程数量
        private long completedTaskCount;
        //创建线程池的工厂类
        private volatile ThreadFactory threadFactory;
        //线程池丢弃策略
        private volatile RejectedExecutionHandler handler;
        //在等待执行任务的线程的最大等待时间
        private volatile long keepAliveTime;
        //核心线程数
        private volatile int corePoolSize;
        //线程池最大可容纳的线程数
        private volatile int maximumPoolSize;
        //默认的线程丢弃策略
        private static final RejectedExecutionHandler defaultHandler =
                new AbortPolicy();
        //int 型变量,低3位表示线程池状态,剩余的位数表示最大线程数
        private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
    

    构造方法

    constructors

    其实前面的三个构造方法最终都调用了最后一个构造方法,所以就来看看最后一个构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler);
    }
    

    参数比较多,下面来解释一下

    • corePoolSize核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
    • maximumPoolSize线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
    • keepAliveTime表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过- corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
    • unit:keepAliveTime的时间单位,有如下几种取值
    时间单位 解释
    TimeUnit.DAYS
    TimeUnit.HOURS 小时
    TimeUnit.MINUTES 分钟
    TimeUnit.SECONDS
    TimeUnit.MILLISECONDS 毫秒
    TimeUnit.MILLISECONDS 微妙
    TimeUnit.NANOSECONDS 纳秒
    • workQueue : 一个阻塞队列,用来存储等待执行的任务,参考下图
    BlockingQueue

    阻塞队列,如果BlockingQueue是空的,从BlockingQueue取东西的操作将会被阻断进入等待状态,直到BlockingQueue进了东西才会被唤醒,同样,如果BlockingQueue是满的,任何试图往里存东西的操作也会被阻断进入等待状态,直到BlockingQueue里有空间时才会被唤醒继续操作。

    • ArrayBlockingQueue(有界队列): FIFO 队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小

    • LinkedBlockingQueue(无界队列):FIFO 队列,大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定。

    • PriorityBlockingQueue:优先级队列, 类似于LinkedBlockingQueue,但队列中元素非 FIFO, 依据对象的自然排序顺序或者是构造函数所带的Comparator决定的顺序

    • SynchronousQueue(直接提交策略): 交替队列,队列中操作时必须是先放进去,接着取出来,交替着去处理元素的添加和移除

    threadFactory::创建线程池的工厂

    RejectedExecutionHandler: 线程丢弃策略,常见的有如下几种

    丢弃策略 解释
    DiscardPolicy 丢弃任务,但是不抛出异常
    CallerRunsPolicy 由调用线程处理该任务
    AbortPolicy 丢弃任务并抛出RejectedExecutionException
    DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

    execute方法

    提交Runnable任务

    execute方法

        public void execute(Runnable command) {
            //为空的话,抛出空指针异常
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            //判断加入当前任务后,线程数是否小于核心线程数
            if (workerCountOf(c) < corePoolSize) {
            //如果小于核心线程数,则将立即执行当前任务
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //核心线程数满了之后,先判断线程池的状态,然后判断缓存队列是否还能进行缓存
            if (isRunning(c) && workQueue.offer(command)) {
                //如果两者都满足条件,则将线程加入缓存队列
                int recheck = ctl.get();
                //如果当前任务没有在运行已经被移除,执行线程丢弃策略
                if (!isRunning(recheck) && remove(command))
                    reject(command);
                 // 正在运行的线程数如果是0,则直接运行当前线程
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //加入当前任务失败,则执行丢弃策略
            else if (!addWorker(command, false))
                reject(command);
        }
    
    

    addWorker方法

    有两个参数,一个是firstTask,表示加入的Runnable任务,一个是core,表示是否添加到核心线程。

        private boolean addWorker(Runnable firstTask, boolean core) {
            retry://定义了循环的名称,便于后面直接中断循环
            for (;;) {
                int c = ctl.get();//获取状态与数量的标志位
                int rs = runStateOf(c);//判断线程状态
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
    //线程池处于关闭状态,firstTask为null,或者缓存队列为空,返回false
                    return false;
                    //死循环
                for (;;) {
                    int wc = workerCountOf(c);//获取线程池数量
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        //比对核心线程数与最大线程数
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                    //添加线程成功,中断循环
                        break retry;
                    c = ctl.get();  //重新获取线程状态与数量的标志位
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                //创建一个心的worker
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                  final ReentrantLock mainLock = this.mainLock;
                   //给当前线程上锁
                    mainLock.lock();
                    try {
                //获取当前线程池状态跟线程数的标记为
                  int rs = runStateOf(ctl.get());
                   if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
            if (t.isAlive()) // precheck that t is startable
            throw new IllegalThreadStateException();
                            //将worker添加到缓存队列中去
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                             //添加成功,改变标记位
                            workerAdded = true;
                        }
                    } finally {
                    //释放锁
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        //添加成功之后,开启线程执行任务
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                //添加失败,调用addWorkerFailed方法
                   addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    submit

    FutureTask

    继承关系

    先看一下FutureTask的继承关系


    FutureTask

    Runnable
    很常见的接口,定义了run方法

    public interface Runnable {
      public abstract void run();
    }
    

    Future
    带有返回值的泛型接口

    public interface Future<V> {
        boolean isCancelled();//任务是否取消
        boolean isDone();//任务是否完成
        //同步方法,任务执行的返回值
        V get() throws InterruptedException, ExecutionException;
        //timeout后获取等待结果
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
            }
    

    RunnableFuture
    继承自Runnable,Future的泛型返回接口

    public interface RunnableFuture<V> extends Runnable, Future<V> {
        void run();
    }
    

    Callable
    //带有返回值的Runnable额接口

    public interface Callable<V> {
      V call() throws Exception;
    }
    
    构造方法

    Callable构造方法

       public FutureTask(Callable<V> callable) {
            if (callable == null)
                throw new NullPointerException();
            this.callable = callable;
            this.state = NEW;    
        }
    
    

    Runnable+Result构造方法

    public FutureTask(Runnable runnable, V result) {
       this.callable = Executors.callable(runnable, result);
       this.state = NEW;   
        }
    

    可以看到不管是Callable还是Runnable构造方法,最后都是使用Callable来进行构造的,之所以这么做,是因为FutureTask需要返回值

    提交任务

    提交Runnable任务

    public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    
    

    提交callable任务+返回值

    
        /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<T> ftask = newTaskFor(task, result);
            execute(ftask);
            return ftask;
        }
    
    

    提交callable任务

        public <T> Future<T> submit(Callable<T> task) {
             //首先判断是否为空
            if (task == null) throw new NullPointerException();
            //将Callable转换成Future
            RunnableFuture<T> ftask = newTaskFor(task);
            //执行execute方法
            execute(ftask);//最后依然会调用execute Runnable方法
            return ftask;
        }
    

    不管是提交什么样task,最后都会被包装成Runnable方法来执行,还是会调用Executor的execute方法。

    tryTerminate

    这个方法是当线程池关闭的时候会调用

        final void tryTerminate() {
         //开启死循环
            for (;;) {
                //获取线程池状态跟数量的标志位
                int c = ctl.get();
                //判断三个条件
                //1.线程是否在运行
                //2.线程池状态小于TIDYING,TERMINATED
                //3.线程池已经关闭并且队列为空
                满足上面的任意一个条件就会直接返回,很好理解
                if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && !   workQueue.isEmpty()))
                    return;
                if (workerCountOf(c) != 0) { 
                // 如果线程数不为0,才有资格去终止
                    interruptIdleWorkers(ONLY_ONE);
                    return;
                }
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                  if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                        try {
                        //CAS设置状态成功,调用terminated,默认空实现
                            terminated();
                        } finally {
                            ctl.set(ctlOf(TERMINATED, 0));
                            termination.signalAll();
                        }
                        return;
                    }
                } finally {
                    mainLock.unlock();
                }
                // else retry on failed CAS
            }
        }
    

    advanceRunState();

    更改当前线程池的状态

      private void advanceRunState(int targetState) {
            for (;;) {
            //获取当前线程的状态及数量的标志位
                int c = ctl.get();
            //更改线程状态
                if (runStateAtLeast(c, targetState) ||
                    ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                    break;
            }
        }
    

    shutdown

     public void shutdown() {
             //锁住线程池
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
        // 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
                advanceRunState(SHUTDOWN);
           // 中断等待中的线程
                interruptIdleWorkers();
                onShutdown();
            } finally {
            //释放锁
                mainLock.unlock();
            }
            tryTerminate();
        }
    

    shutdownNow

        public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
              //传入Stop状态,其余跟shutdown保持一致
                advanceRunState(STOP);
              //中断所有线程
                interruptWorkers();
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
            return tasks;
        }
    

    shutdownNow() 和 shutdown()的大体流程相似,差别是:

    • 1、advanceRunState传入的是Stop
    • 2、调用 interruptWorkers() 中断所有线程,包括正在运行的线程
    • 3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务

    Executors

    Executors是Java提供的一个线程池的帮助类,可以帮助我们快速的处理线程池。

    构造线程池

    由于Java的线程池的构造方法比较复杂,所以Java又提供了Executors这个辅助类,帮助我们更快速地创建ThreadPoolExecutor,可以帮助我们创建4种类型的ThreadPool


    Executors_create
    • 单线程异步队列:Executors.newSingleThreadExecutor(),创建一个单线程的线程池。这个线程池只有一个线程在工作,也就是相当于单线程串行执行>所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池>保证所有任务的执行顺序按照任务的提交顺序执行。

    • 周期性调度:Executors.newFixedThreadPool(),创建固定大小的线程池。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

    • 可缓存的线程:Executors.newCachedThreadPool(int size),创建一个可缓存的线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

    • 多线程周期性调度:Executors.newScheduledThreadPool(1),创建一个大小无限的线程池。此线程池支持定时以及周期性执行任务的需求。

    Callable转换

    executors_change

    参考资料

    https://www.cnblogs.com/trust-freedom/p/6693601.html
    https://zhuanlan.zhihu.com/p/27232156

    相关文章

      网友评论

          本文标题:并发编程(5)ThreadPoolExecutor原理解析

          本文链接:https://www.haomeiwen.com/subject/rbdswxtx.html