美文网首页
ThreadPoolExecutor线程池原理分析

ThreadPoolExecutor线程池原理分析

作者: feifei_fly | 来源:发表于2021-01-12 15:39 被阅读0次

    ThreadPoolExecutor 我们在开发过程中经常用到,它的主要作用就是提前创建好若干个线程放在一个容器中。如果有任务需要处理,则将任务直接分配给线程池中的线程来执行就行,任务处理完以后这个线程不会被销毁,而是等待后续分配任务。

    那它是如何实现线程复用的?今天我们就回答这个问题。

    一、ThreadPoolExecutor的基本用法

    1.1、TheadPooolExecutor的构造函数

    public ThreadPoolExecutor(int corePoolSize,         //核心线程数量
       int maximumPoolSize, //最大线程数
       long keepAliveTime, //超时时间,超出核心线程数量以外的线程空余存活时间
       TimeUnit unit, //存活时间单位
       BlockingQueue<Runnable> workQueue, //保存执行任务的队列
      ThreadFactory threadFactory,//创建新线程使用的工厂
      RejectedExecutionHandler handler //当任务无法执行的时候的处理方式)
    
    • corePoolSize 代表缓存的核心线程个数
    • maximumPoolSize 代表线程池最大的线程个数
    • keepAliveTime 代表超时销毁时间,针对非核心线程,空闲若干时间就会被销毁。
    • unit keepAliveTime的时间单位
    • workQueue 尚未执行的任务(Runnable)队列。
    • threadFactory 创建Thread类的工厂
    • handler 任务无法执行时的处理操作

    创建线程池

      var threadPool = ThreadPoolExecutor(1,2,60,TimeUnit.SECONDS,LinkedBlockingDeque<Runnable>())
    

    执行线程任务

      threadPool.execute(Runnable { 
                
            })
    

    ThreadPoolExecutor的主要逻辑是:

    • 线程数少于核心线程数,新建线程执行任务
    • 线程数等于核心线程数后,将任务加入阻塞队列
    • 如果队列容量非常大,可以一直添加;如果队列容量有限,队列满了之后,则尝试创建一个非核心线程执行任务。
    • 执行完成任务的线程 反复去任务队列任务来执行。
    • 任务队列为空时,核心线程会阻塞(block),直到有新的任务。

    1.2、常用线程池

    newFixedThreadPool

    newFixedThreadPool 的核心线程数和最大线程数都是指定值,当线程池中的线程数超过核心线程数后,任务都会被放到阻塞队列中。这里选用的阻塞队列是LinkedBlockingQueue,使用的是默认容量 Integer.MAX_VALUE,相当于没有上限。

    public class Executors {
        public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>(),
                                          threadFactory);
        }
    }
    

    用途:FixedThreadPool 用于负载比较大的服务器,为了资源的合理利用,需要限制当前线程数量。
    缺点:任务队列没有上限,一直追加可能造成OOM

    newCachedThreadPool

    newCachedThreadPool 创建一个可缓存线程池。核心线程数为0,最大线程数为Integer.MAX_VALUE。如果线程池长度超过处理需要,可灵活回收空闲线程。
    接收到新任务将被立即执行:若有空闲线程,则放到空闲线程执行;若无空闲线程,则创建新的非核心线程来执行。
    创建的线程空闲60s 则会被回收。

    public static ExecutorService newCachedThreadPool() {
     return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
     60L, TimeUnit.SECONDS,
     new SynchronousQueue<Runnable>());
    }
    

    缺点:线程可以无限创建,当接收大量任务时 可能创建大量的线程,给JVM过大的负担。

    newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    

    newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行

    二、线程池实现原理

    ThreadPoolExecutor的核心属性和方法

    public class ThreadPoolExecutor extends AbstractExecutorService {
        
            //线程池任务队列
            private final BlockingQueue<Runnable> workQueue;
            //线程池工作线程
            private final HashSet<Worker> workers = new HashSet<>();
    
            //线程池状态
            private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    
            private static final int RUNNING    = -1 << COUNT_BITS;
            private static final int SHUTDOWN   =  0 << COUNT_BITS;
            private static final int STOP       =  1 << COUNT_BITS;
            private static final int TIDYING    =  2 << COUNT_BITS;
            private static final int TERMINATED =  3 << COUNT_BITS;
    
            // Packing and unpacking ctl
            private static int runStateOf(int c)     { return c & ~CAPACITY; }
            private static int workerCountOf(int c)  { return c & CAPACITY; }
            private static int ctlOf(int rs, int wc) { return rs | wc; }
    
            public void execute(Runnable command) {
            
            }
            
            public void shutdown() {}
              
            public List<Runnable> shutdownNow() {}
    }
    
    
    2.1、workQueue;

    workQueue 是线程池的任务队列,里面缓存待执行的Runnable任务

    2.2、workers

    workers 是当前正在执行的工作线程集合。

    2.3、ctl 线程池状态

    ctl是一个原子类,主要作用是用来保存线程数量和线程池的状态

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    

    一个 int 数值是 32 个 bit 位,这里采用高 3 位来保存运行状态,低 29 位来保存线程数量。

    private static int ctlOf(int rs, int wc) { return rs | wc; }
    
    private static final int COUNT_BITS = Integer.SIZE - 3; //32-3
    private static final int CAPACITY = (1 << COUNT_BITS) - 1; //将 1 的二进制向右位移 29 位,再减 1 表示最大线程容量
    //运行状态保存在 int 值的高 3 位 (所有数值左移 29 位)
    private static final int RUNNING = -1 << COUNT_BITS;// 接收新任务,并执行队列中的任务
    private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接收新任务,但是执行队列中的任务
    private static final int STOP = 1 << COUNT_BITS;// 不接收新任务,不执行队列中的任务,中断正在执行中的任务
    private static final int TIDYING = 2 << COUNT_BITS; //所有的任务都已结束,线程数量为 0,处于该状态的线程池即将调用 terminated()方法
    private static final int TERMINATED = 3 << COUNT_BITS;// terminated()方法执行完成
    
    • RUNNING 线程池处于运行状态:可以接收新的任务,可以执行任务队列中的任务。
    • SHUTDOWN 线程池处于关闭状态:不接收新的任务,但是任务队列中的任务会继续执行。
    • STOP 线程池处处于停止状态:不接收新的任务,不执行任务队列中的任务,中断正在运行的任务。
    • TIDYING 所有的任务都已经结束,线程数为0。处于此状态的线程池,即将调用terminated()方法。
    • TERMINATED terminate方法执行完成。
    状态转换

    2.4、shundown() 和showdonwNow()

    • 执行 shutdown() 线程池会进入SHUTDOWN状态:不接收新的任务,但是任务队列中的任务会继续执行。
    • 执行 shutdownNow() 线程池会进入STOP状态:不接收新的任务,不执行任务队列中的任务,中断正在运行的任务。

    2.5、execute() 执行Runnable任务

     public void execute(Runnable command) {
       
    
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) { //1、工作线程数 小于核心线程个数,则尝试直接创建一个新的核心线程来执行任务。
     
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
    
            if (isRunning(c) && workQueue.offer(command)) { //2、核心线程数已满,任务队列未满,则将任务添加到任务队列
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command)) //线程状态二次检查,如果当前线程池关闭了,则移除该任务
                    reject(command);
                else if (workerCountOf(recheck) == 0) //线程状态二次检查,如果当前工作线程数为0,则新建非核心线程来执行剩余任务
                    addWorker(null, false);
            }
            else if (!addWorker(command, false)) //3、任务队列已慢,则尝试创建一个新的非核心线程 处理该任务。
                reject(command);
        }
    

    2.5.1、addWorker

    如果工作线程数小于核心线程数的话,会调用 addWorker,创建一个工作线程。
    大致做了两件事情:

    1、状态检查,不允许创建线程的情况,直接返回false; 条件符合,则工作线程计数+1
    • (1)如果线程处于非运行状态,不允许新创建线程,直接返回false
    • (2)如果工作线程数大于默认容量大小或者大于核心线程数大小,则直接返回 false 表示不能再添加 worker,返回false
    • (3)通过 cas工作线程数加1
    2、创建Worker实例,添加到workers集合,并开启线程
    • (4)将新创建的 Worker 添加到 workers 集合中
    • (5)如果 worker 添加成功,启动线程
     private boolean addWorker(Runnable firstTask, boolean core) {
        retry: //goto 语句,避免死循环
    
       //状态检查,不允许创建线程的情况,直接返回false;  workerCount计数+1。
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            //(1)如果线程处于非运行状态,不允许新创建线程,直接返回false
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;
            for (;;) { //自旋
                int wc = workerCountOf(c);//获得 Worker 工作线程数
                //(2)如果工作线程数大于默认容量大小或者大于核心线程数大小,则直接返回 false 表示不能再添加 worker,返回false
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//(3)通过 cas工作线程数加1
                    break retry;
                c = ctl.get(); // Re-read ctl 
                if (runStateOf(c) != rs) //这里如果不相等,说明线程的状态发生了变化,继续重试
                    continue retry;
         
            }
        }
    
        //创建Worker实例,添加到workers集合,并开启线程
        boolean workerStarted = false; 
        boolean workerAdded = false; 
        Worker w = null;
        try {
            w = new Worker(firstTask); //构建一个 Worker,传入了一个 Runnable 对象
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock(); 
                try {
               
                    int rs = runStateOf(ctl.get());
                    //将新创建的Workder 添加到workers工作线程集合当中。
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                      
                        workers.add(w); //(4)将新创建的 Worker 添加到 workers 集合中
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s; 
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock(); 
                }
                if (workerAdded) {
                    t.start();//(5)如果 worker 添加成功,启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w); //如果添加失败,就需要做一件事,就是递减实际工作线程数(还记得我们最开始的时候增加了工作线程数吗)
        }
        return workerStarted;//返回结果
    }
    
    Worker 是什么?

    Worker代表一个工作线程,内部持有一个thread和firstTask。

    • firstTask 是线程初始化时要被首先执行的任务
    • thread 是在调用构造方法时通过 ThreadFactory 来创建的线程,是用来处理任务的线程。
    • 线程具体的执行操作代理到了外部的runWorker()方法中。
     private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            final Thread thread;
            Runnable firstTask;
        
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker. */
            public void run() {
                runWorker(this);
            }
    
        }
    
    
    
    
    runWorker 方法

    runWorker 是工作线程 执行任务的主要场所。

    • 如果 task 不为空,则开始执行 task
    • 如果 task 为空,则通过 getTask()再去取任务,并赋值给 task,如果取到的 Runnable 不为空,则执行该任务
    • 执行完毕后,通过 while 循环继续 getTask()取任务
    • 如果 getTask()取到的任务依然是空,那么整个 runWorker()方法执行完毕
    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock();
            boolean completedAbruptly = true;
    
            try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
            
                   ...
                    try {
                        beforeExecute(wt, task);//任务执行前回调
                        Throwable thrown = null;
                        try {
                            task.run(); //执行具体的任务
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);//任务执行后回调
                        }
                    } finally {
                        task = null; //task设置成null,保证下次进入while循环执行getTask() 
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);//线程执行完成,将当前worker 从wokers集合移除
            }
        }
    
    

    整体看就是一个无限循环:

    从队列提取任务->执行任务->队列提取任务->执行任务
    

    当getTask() 返回空任务时,循环结束,该工作线程也就结束了。

    getTask

    线程池之所以可以复用线程,关键点就在getTask。

    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
            for (;;) {//自旋
                int c = ctl.get();
                int rs = runStateOf(c);
                
                //(1)对线程池状态的判断:
                //线程池处于SHUTDOWN状态 且任务队列为空, 或者 线程池处于STOP状态,应该销毁当前线程,此时返回null。
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;//返回 null,则当前 worker 线程会退出
                }
                int wc = workerCountOf(c);
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                // timed 变量用于判断是否需要进行超时控制。
                // allowCoreThreadTimeOut 默认是 false,也就是核心线程不允许进行超时;
                // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
                // 对于超过核心线程数量的这些线程,需要进行超时控制
           
    
                //(2) 工作队列中有未执行的任务 且工作线程数量超过了maximumPoolSize 
               //或者上一次取任务已经超时,则 返回null,当前线程退出。
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                        && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
                try {
                    /*根据 timed 来判断,如果为 true,则通过阻塞队列 poll 方法进行超时控制,如果在
                    keepaliveTime 时间内没有获取到任务,则返回 null.
                    否则通过 take 方法阻塞式获取队列中的任务*/
    
                    //(3) timed = true,表示需要进行超时控制,采用workQueue.poll() 从队列中取任务,会阻塞keepAliveTime时间,超时后 timedOut = true,下次循环会直接返回null。 - 线程被回收。
                          timed = false,表示不需要进行超时控制,采用workQueue.take(),该线程会一直阻塞,直到有新的任务。 - 线程不被回收
                    Runnable r = timed ?
                            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                            workQueue.take();
                    if (r != null)//如果拿到的任务不为空,则直接返回给 worker 进行处理
                        return r;
                    timedOut = true;//如果 r==null,说明已经超时了,设置 timedOut=true,在下次自旋的时候进行回收
                } catch (InterruptedException retry) {
                    timedOut = false;// 如果获取任务时当前线程发生了中断,则设置 timedOut 为false 并返回循环重试
                }
            }
        }
    
    
    
    

    关键代码如下;

      boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
      Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                            workQueue.take();
    

    默认 allowCoreThreadTimeOut为false

    • 对于核心线程timed =false,workQueue.take() 取下一个任务,workQueue是LinkedBlockQueue,没有新任务时会一直阻塞在这里(线程不会被释放),当用新任务添加进来,take()返回新任务,线程继续执行。
    • 对于非核心线程(wc > corePoolSize),采用workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方式提取新任务,当workQueue为空时,会等待keepAliveTime,时间超时后,返回null。当前线程就会被释放。

    至此 ThreadPoolExecutor 可以服用线程的原理也就清除了。

    processWorkerExit

    runWorker 的 while 循环执行完毕以后,在 finally 中会调用 processWorkerExit,将当前线程从worker中移除,并执行tryTerminate()

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                completedTaskCount += w.completedTasks;
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    
            tryTerminate();
    
            int c = ctl.get();
            if (runStateLessThan(c, STOP)) {
                if (!completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    if (workerCountOf(c) >= min)
                        return; // replacement not needed
                }
                addWorker(null, false);
            }
        }
    

    2.6、拒绝策略

    当工作线程数超过corePoolSize,且工作队列已满,且线程总数也达到了maximumPoolSize(非核心线程也无法创建),此时就会拒绝新任务:

    final void reject(Runnable command) {
            handler.rejectedExecution(command, this);
        }
    

    1、AbortPolicy:直接抛出异常,默认策略;
    2、CallerRunsPolicy:用调用者所在的线程来执行任务;
    3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4、DiscardPolicy:直接丢弃任务;

    三、其他

    3.1、如何取消任务

    ThreadPoolExecutor.execute() 提交一个任务是无法取消该任务的。

    那提交了一个任务之后,想取消执行该任务 有没有途径呢?

    答案是submit()方法

       public Future<?> submit(Runnable task) {
            if (task == null) throw new NullPointerException();
            RunnableFuture<Void> ftask = newTaskFor(task, null);
            execute(ftask);
            return ftask;
        }
    

    submit方法返回一个Future对象

    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException;
        V get(long timeout, TimeUnit unit)
            throws InterruptedException, ExecutionException, TimeoutException;
    }
    

    Future.cancel(true) 可以取消Runnable任务。

    • 任务尚未执行,调用cancel()后,该任务就永远不会被执行
    • 任务正在执行,mayInterruptIfRunning=true时,会interrupted Thread.
    • Attempts to cancel execution of this task. This attempt will
      • fail if the task has already completed, has already been cancelled,
      • or could not be cancelled for some other reason. If successful,
      • and this task has not started when {@code cancel} is called,
      • this task should never run. If the task has already started,
      • then the {@code mayInterruptIfRunning} parameter determines
      • whether the thread executing this task should be interrupted in
      • an attempt to stop the task.

    测试代码

    
    fun testThreadPool(){
            var threadPool = ThreadPoolExecutor(1,5,60,
                TimeUnit.SECONDS,
                LinkedBlockingDeque<Runnable>()
            );
    
            var fu =  threadPool.submit(MyRunaable())
    
            Thread.sleep(10000)
            fu.cancel(true)
            Log.d("feifei","fu cancel")
        }
        
      class MyRunaable():Runnable{
            var index = 0;
            override fun run() {
    
                while(true){
                    Thread.sleep(1000)
                    Log.d("feifei","MyRunaable run():${index++}")
                }
            }
        }
    

    输出结果:

    2021-01-12 15:12:29.925 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():0
    2021-01-12 15:12:30.925 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():1
    2021-01-12 15:12:31.926 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():2
    2021-01-12 15:12:32.926 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():3
    2021-01-12 15:12:33.927 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():4
    2021-01-12 15:12:34.928 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():5
    2021-01-12 15:12:35.928 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():6
    2021-01-12 15:12:36.929 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():7
    2021-01-12 15:12:37.929 6847-6875/com.sogou.iot.myapplication D/feifei: MyRunaable run():8
    2021-01-12 15:12:38.925 6847-6847/com.sogou.iot.myapplication D/feifei: fu cancel
    

    3.2、线程池的监控

    如果在项目中大规模的使用了线程池,那么必须要有一套监控体系,来指导当前线程池的状态,当出现问题的时候可以快速定位到问题。而线程池提供了相应的扩展方法,我们通过重写线程池的 beforeExecute、afterExecute 和 shutdown 等方式就可以实现对线程的监控,简单给大家演示一个案例

    3.3、使用线程池注意事项

    3.3.1、 使用ThreadPoolExecutor而非Executors创建线程池

    用 Executors创建线程池 使得用户不需要关心线程池的参数配置,意味着大家对于线程池的运行规则也会慢慢的忽略。这会导致一个问题。
    比如我们用 newFixdThreadPool 或者 singleThreadPool.允许的队列长度为Integer.MAX_VALUE,如果使用不当会导致大量请求堆积到队列中导致 OOM 的风险而 newCachedThreadPool,允许创建线程数量为 Integer.MAX_VALUE,也可能会导致大量线程的创建出现 CPU 使用过高或者 OOM 的问题。

    3.3.2、合理设置线程池大小。

    需要分析线程池执行的任务的特性: CPU 密集型还是 IO 密集型

    • 如果是 CPU 密集型:主要是执行计算任务,响应时间很快,cpu 一直在运行,这种任务 cpu的利用率很高。CPU 核心数=最大同时执行线程数,加入 CPU 核心数为 4,那么服务器最多能同时执行 4 个线程。过多的线程会导致上下文切换反而使得效率降低。
    • 如果是 IO 密集型:主要是进行 IO 操作,执行 IO 操作的时间较长,这是 cpu 出于空闲状态,导致 cpu 的利用率不高,这种情况下可以增加线程池的大小。一般可以配置 cpu 核心数的 2 倍。

    四、参考文章

    https://www.jianshu.com/p/a977ab6704d7

    https://www.jianshu.com/p/7b2da1d94b42

    相关文章

      网友评论

          本文标题:ThreadPoolExecutor线程池原理分析

          本文链接:https://www.haomeiwen.com/subject/jrceaktx.html