美文网首页
《Java核心技术系列一》ThreadPoolExecutor

《Java核心技术系列一》ThreadPoolExecutor

作者: 逍遥无极 | 来源:发表于2020-03-08 21:25 被阅读0次

    该系列统一使用java8的源码进行讲解。

    由于线程的创建于销毁是存在开销的,为了避免频繁的创建与销毁线程,Java采用了池化技术来管理线程资源。只要涉及到多线程、异步的场景,基本就会有线程池的存在。因此掌握好线程池实现原理对程序员来说非常的重要,也是通往高级程序员以及架构师的必经之路。 本文主要从以下几个方面对线程池技术进行讲解。

    • 剖析线程池的源码实现
    • 讲解使用线程池的注意事项
    • 线程池的变异使用方式(Tomcat与Netty如何使用线程池)
    • 面试中的线程池问答

    一. 源码剖析

    为了使线程池可以适用于多种场景,对于线程池的创建提供了多个参数,进行控制。各个参数的含义必须要非常的明确。

    1.1 构造方法

    • corePoolSize 核心线程数
    • maximumPoolSize 最大线程数
    • keepAliveTime 保活时间
    • unit 保活时间的单位
    • workQueue 任务队列
    • threadFactory 线程工厂
    • handler 拒绝策略

    结合参数描述一下线程的工作原理,以新来一个任务为例:

    1. 新来任务后,如果线程数<corePoolSize,则创建线程执行(即便存在空闲的线程),否则执行2
    2. 如果workQueue没有达到最大值则扔进阻塞队列,否则执行3
    3. 如果线程数<maximumPoolSize,则创建线程执行,否则执行4
    4. 按照指定的拒绝策略handler处理新来的任务

    除了上面步骤提到的参数外,还有

    1. keepAliveTime, unit 保活时间,如果Worker阻塞在从workQueue中获取任务的时间超过该时间,且线程数>corePoolSize,那么就会对该Worker进行销毁,避免过多的线程阻塞,浪费资源。
    2. threadFactory 线程工厂,用于创建线程对象

    连接了各个参数的含义,看下构造函数的源码:

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.acc = System.getSecurityManager() == null ?
                    null :
                    AccessController.getContext();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    源码中只是进行参数取值范围控制,并赋值。

    1.2 execute 提交任务

    创建好线程池之后,我们就需要往线程池中提交任务,提交任务有两个方法(低级的面试也会问这两个方法有什么区别):

    1. submit() 有返回值,返回Future对象(Future后面再将)
    2. execute() 无返回值

    其中 submit也只是任务包装成Future之后,调用execute,所以这里我们只需要看execute方法的实现即可。

        public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            //线程数小于核心线程数则新增worker执行
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
           //否则,扔到阻塞队列
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //扔进阻塞队列后判断状态,如果线程池状态处于非运行状态,则执行拒绝策略handler
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                //如果运行着,但是没有worker,那么新增worker执行,为什么会出现这种情况?
                //因为有参数可以控制核心线程数也可以在超时的情况下被销毁:allowCoreThreadTimeOut这个参数控制
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 队列慢,则新增worker执行任务
            else if (!addWorker(command, false))
                //worker也达到上限,则执行拒绝策略
                reject(command);
        }
    

    其中,ctl一个线程安全的AtomicInteger变量,用一个整数来记录了线程池的状态(高三位)和目前线程池中线程(Worker)的个数(低29位)
    举例说明:ctl的值为:
    1000 0000 0000 0000 0000 0000 0000 0001 高三位100代表线程池处于运行状态,低29位为1,说明目前线程池中只有1个线程。
    workerCountOf(c) 返回的就是低29位表示的数,即线程个数
    isRuning(c) 就是判断高3位是否为100,100位运行状态
    然后上面的代码逻辑就是我们一开始整理的新来一个任务时,线程池的执行逻辑。非常的重要,几乎每次面试都会被问。

    1.3 Worker 线程池中的工作者

    线程池中的工作者是Worker,Worker不仅对Thread进行了包装,还继承了AbstractQueuedSynchronizer(AQS相关的知识简单讲,后面会有文章细讲)实现了Runnable,下面我们就带着问题一起来认识下Worker。

    1.3.1. Worker为什么要实现Runnable接口?

    Worker中封装了Thread,也就是在构造Worker的时候,会创建Thread对象,Thread对象又要关联一个任务去执行,那这个任务就是Worker自己本身。也就是说:Worker中的线程对象Thread执行的是Worker的run方法。这样的话,thread一旦执行,执行的就是Worker的run方法,看下Worker的构造方法:

            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    

    从构造方法中即可以看出,thread一旦启动,调用的就是Worker的run方法。

    1.3.2 Worker为什么还实现了AbstractQueuedSynchronizer

    这里主要是为了实现Worker的中断。从1.3.1 Worker的构造函数中可以看到,设置状态为-1, 相当于给Worker加了一把锁。那什么时候会解锁呢?简单看下runWork方法(也就是Worker的run方法),代码如下:

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            //...省略
    }
    

    其中unlock()方法就是解锁,unlock方法会调用Worker的release方法,将state的值+1,这样state值就为0了。因为Worker创建并不代表Thread执行,只有Thread线程真正执行了,才会响应中断。此外,在执行每一个task的过程中也不允许中断。响应中断的方法如下:

          void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
    

    首先会判断getState(), 这个state就是AQS的值,当Thread线程开始执行后,该值就会变为0,那么在这个中断方法中就可以进入进行中断了。

    1.3.3 Worker线程都做了哪些事情

    这就要看runWork方法了,代码如下:

        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                //首次task不为null执行自己的任务,此后从workQueue中去任务
                while (task != null || (task = getTask()) != null) {
                    //上锁,不允许中断
                    w.lock();
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    当worker创建时,firstTask是被赋值了的,所以先执行自己的任务,此后所有的任务都是通过getTask()从workQueue中获取。拿到任务后先lock加锁,然后通过调用task.run方法执行任务,执行完成后,解锁。
    从这里可以看出来,在一个任务任务的执行过程中是不需要中断的
    通过getTask方法,如果返回的是null,那么就要执行processWorkerExit,对该Worker进行退出

    1.3.4 getTask只是从workerQueue中获取任务吗?

    getTask除了从workerQueue中获取任务外,还会对worker的等待时间进行判断,释放掉多余的worker。
    看下getTask的实现:

        private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 线程池关闭状态下,如果workQueue空,则减少Worker
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // 判断是否需要因为worker数>corePoolSize 而销毁worker
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
               //超时,且要多余1个线程,且目前没有任务需要处理,则进行销毁Worker
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    //仅仅从数量上-1,销毁Worker的事情让runWork方法去做
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    //去队列中获取数据,如果需要考虑超时,则按照超时返回的策略去获取任务
                   //如果不需要考虑超时,则直接使用take方法阻塞在workQueue上
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    //任务存在,直接将任务返回,执行任务
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    getTask方法会根据当前线程池的状态,去判断该Worker是否需要有限超时从workQueue中获取任务,这样可以让getTask提前退出,销毁多余的Worker。从这里也可以看出来并不会说先创建的线程就是核心线程,线程池只关心线程的数量,不关心哪些线程是因为<corePoolSize创建的,哪些是因为>=corePoolSize创建的,在销毁的时候是随机销毁的。

    1.4 Worker何时被启动的

    当一个新任务被提交到线程池后,有三种情况会创建新的worker并启动worker

    1. 线程数<corePoolSize时
    2. 线程数>=corePoolSize,且workQueue满时
    3. 任务添加到阻塞队列后,发现线程数为0时

    会调用addWorker方法完成Worker的新增,代码如下:

       private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            //先通过死循环,保证在ctl上把worker数加上
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
               //构造一个worker
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    //保证线程安全,只能有一个线程执行这段代码
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                   //添加成功后,通过线程启动worker
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    addWorker做了几件事情

    1. 在死循环中完成对ctl数值+1,这里为什么不用加锁?因为这里使用的是cas操作,属于乐观锁,不需要加锁也能保证线程安全的修改ctl
    2. 创建worker,并加锁将worker放到workers列表中,然后通过执行线程的start方法,调用Worker的run方法,然后执行runWork方法,Worker就开始工作了

    到此,关于线程池的核心源码部分就基本完成了,关于更细致的源码剖析,线程池各个状态的转换细节可以参考我的另一篇简书上的文章 https://www.jianshu.com/p/a52f438c16be,有关线程池相关的剩余部分限于篇幅问题,放在下一篇中继续剖析。如有问题欢迎大家指正,我们一起学习,共同进步。

    相关文章

      网友评论

          本文标题:《Java核心技术系列一》ThreadPoolExecutor

          本文链接:https://www.haomeiwen.com/subject/ydcjdhtx.html