美文网首页
线程池你真的懂了吗

线程池你真的懂了吗

作者: QI的咖啡 | 来源:发表于2021-07-21 18:36 被阅读0次

    阅读任何源码,我们都应该带着几个问题去阅读,从源码中找出这些问题的答案,这样才能彻底搞明白某个知识点。

    下面我们就带着这样几个问题,一起看一下ThreadPoolExecutor的源码

    1. 为什么要用线程池
    2. 为什么不推荐使用juc直接创建的线程池
    3. 线程池的几个核心参数
    4. 线程池是什么时候创建线程的?
    5. 线程池是如何重复利用线程的?
    6. 任务提交的顺序和执行的顺序是一样的吗?

    1、为什么要使用线程池

    这个其实可以写一个简单的程序去跑一下,比如使用线程池去跑1000个task和开1000个线程去跑这1000个task,线程池的效率会高出很多倍,原因是线程池能够重复利用线程,没有创建和销毁线程的开销。
    其实池化的技术在很多地方都会用到比如数据库的连接池,字符串常量池,netty的对象池等等

    2、为什么不推荐使用juc直接创建线程池的方式

    我们找两个Exectors创建线程池的源码

    a、newCachedTreadPool的源码

     public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    

    可以看到这里的最大线程数是0xffff个,这个最大线程数在大并发提交任务的情况下会创建大量线程,会导致CPU100%

    b、newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>(),
                                        threadFactory));
        }
    
    //看一下LinkedBlockingQueue的实现
     public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
        }
    

    会初始化一个容量为0xffff的队列,由于这个队列太大,如果我们提交的任务数很多并且自定义的线程里的对象又很大的话,就很容易发生oom的问题。

    从这个工具类创建线程的参数我们可以看到底层调用的都是ThreadPoolExecutor的构造方法,所以我们建议根据具体业务的规模设置合适的线程池参数。

    new ThreadPoolExecutor()
    

    3、线程池的几个核心参数

    ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue) 
    

    corePoolSize:最大线程数
    maximumPoolSize:最大线程数
    keepAiveTime:线程存活时间
    TimeUnit:存活时间的参数
    BlockingQueue:线程池的任务队列

    task投递到线程池中的整个过程如下


    image.png

    看一下具体的代码

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
    
            //这里判断线程数量是否小于corePoolSize,如过小于corePoolSize则直接创建worker线程
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //到这个if说明worker线程数量大于了corePoolSize了,这里直接添加到任务队列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //到这个if说明任务加入队列失败,队列满了,则再创建线程worker线程,如果创建失败则执行拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }
    

    看到这里从宏观上我们就能看到整个task投递到线程池的一个过程,其实这里最主要的方法是addWorker,其实addworker里才是线程池的精华,里面有如何创建线程及start的逻辑,如何回收过期的线程,如何重复利用创建的线程去运行task的逻辑

    4、线程池是什么时候创建线程的

    线程池的线程不是线程池创建的时候创建的,线程池的线程是在调用addWorker方法,并且addWorker执行成功才会创建
    下面我们一起分析一下addworker代码,这个方法很长,其实要看明白这个方法我们要先看一下Worker这个类,可以看到这个类实际上实现了Runnable接口,其实线程池中运行的runnable任务都会被包装成Worker对象

     private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
         
            private static final long serialVersionUID = 6138294804551838833L;
            //当前worker会绑定一个线程
            final Thread thread;
           
            //当前worker处理的第一个任务
            Runnable firstTask;
            volatile long completedTasks;
            //创建worker时就会生成 一个线程及赋值firstTask,
            //注意线程池的线程就是在这里被创建的
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                //注意看这里,这个线程创建的时候传的是this,也就意味着一会this.thread.start时,
                //执行的是worker对象的run方法
                //这个大家想一下,回味一下
                this.thread = getThreadFactory().newThread(this);
            }
    }
    

    下面在具体分析一下addWorker是如何start线程及重复利用线程运行任务的

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                //获取worker数量
                int c = ctl.get();
                //获取线程池状态
                int rs = runStateOf(c);
    
                // 如果线程池状态为SHUTDOWN就不接收任务了直接returnfalse
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    //这里会判断一下worker数量
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
            
            //如果代码运行到了这里,就表示线程池状态正常,任务达到了创建worker线程的条件
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                //这里会创建一个worker对象,
                //结合上面的代码,worker对象中会包含一个线程和一个firstTask
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    //下面这一部分是用来判断需不需要将worker线程进行缓存,给其他的任务使用
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        int rs = runStateOf(ctl.get());
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            //注意看这个workers对象,这是一个hashset,用来缓存创建好的worker对象
                            //注意在强调一下这个worker对象包含一个Thread引用及Runnable引用
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
    
                    //缓存好worker线程后,这里会执行线程的start逻辑
                    //注意线程池的任务就是在这里开启start使任务真正进入runnable状态的
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    下面我们看一下start的具体逻辑:
    刚刚我们已经看到了,thread是worker对象的一个属性,实际上创建线程时,传的参数就是worker对象本身,所以线程start执行的逻辑就是worker对象的run方法

    //worker的run方法很简单,下面我们看一下runWorker方法
    public void run() {
                runWorker(this);
            }
    

    看完这个我们就基本上看完了线程池的核心逻辑了,但是还有一些细节没有仔细看,后面我会提到,留给大家思考

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            //获取一下fisrtTask
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
               //这个task什么时候不为空?
               //注意这里会有一个隐含的问题,我们提交的任务存放的顺序是 核心worker->队列->最大线程worker
              //这里的getTask是从队列里获取的任务
              //这里task的执行顺序就变成了,核心worker->最大线程worker->队列
              //不知道大家有没有get到我的点,可以做一个实验就是为task编一个号,比如安1-10的顺序提交任务,最终执行的结果却是 1,2,3,4,8,9,10,5,6,7这种顺序,这里就是出现这种提交顺序和执行顺序不一样的原理
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            //这里会最终调用task的run方法,到此线程池的创建到运行任务就看结束了
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
    
                //这里会销毁线程,在队列为空的时候,会销毁线程,让线程数量停留在corePoolsize范围内,什么时候会执行到这,当task为空的时候,什么时候task为空,看getTask的逻辑,getTask会判断队列是否为空及活跃线程的时间来返回task的值,这里就不细讲了
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    5、线程池是如何重复利用线程的

    看完上面的分析,其实就能回答这个问题了,当任务提交时会创建worker对象,这个对象里会有绑定一个线程,同时会将worker对象放入workers Set中,创建成功后,会立马调用worker.thread.start方法启动线程,这个线程的run方法进行了包装,首先判断fisrtTask是否为空如果不为空则直接运行,否则会while循环拿缓存队列中的任务,知道缓存队列为空,或者空闲线程超过了keepalive时间就会销毁线程,以保证线程维持在corePoolSize的大小

    6、任务提交的顺序和执行的顺序是一样的吗?

    不一样,提交顺序是 核心worker线程->队列->非核心worker线程,
    但是执行的顺序却是核心worker任务->非核心worker任务->队列任务
    如果能理解这个,线程池就真的理解的差不多了

    7、其他

    上面几个问题有些是我在看源码时的困惑点,有些是我看完源码之后的一些想法,除了这些问题外,线程池还有很多精妙的地方比如,
    a、线程池的状态和核心线程数其实是用一个4个字节int表示的,为什么要这么表示
    b、线程池中使用到的设计模式有哪些
    c、线程池本身就是并发场景下提交任务的,那它自己的安全性是如何保证的,execute方法是如何保证安全性的

    相关文章

      网友评论

          本文标题:线程池你真的懂了吗

          本文链接:https://www.haomeiwen.com/subject/rwojmltx.html