美文网首页Android进阶之路安卓开发博客Android开发经验谈
Java线程池复用线程原理,都是教训,你不来看看?

Java线程池复用线程原理,都是教训,你不来看看?

作者: 唠嗑008 | 来源:发表于2020-06-15 16:07 被阅读0次

    前言

    在这里友情提示一下,这个知识点非常重要而且及其容易被大家忽略,如果你在面试的时候回答不出来,很有可能会让面试官觉得你没有读过源码,遇到一些极端的,你就回家等消息了。我一个朋友之前是看过这个线程池源码的,但是他看的是工作原理,但是面试的时候先问他的是复用原理,他恰好没看过,也反映不过来,面试官没给他解释机会,当场挂,大家吸取教训!

    相信很多人都接触过线程池,我们知道线程池有核心线程和非核心线程之分,其中核心线程是一直存活在线程池中的,而非核心线程是在执行完任务之后超时销毁的。但是大家应该都知道一点,当Thread执行完Runnable任务之后就会销毁,而且就算执行完任务之后把线程挂起也没有办法再去执行其他任务,那线程池是如何做到核心线程复用的呢?下面就通过阅读源码的方法带大家了解背后的原因。

    推荐阅读之前的文章
    深入浅出Java(Android )线程池ThreadPoolExecutor

    大家可以对着这个流程图去学习源码,把这个图掌握了,线程池原理也就差不多了


    线程池工作流程

    首先来看一下执行线程任务的方法,里面很简单,就是根据工作线程数量去执行不同的策略,里面分成了3种情况,但是都会执行addWoker()方法

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            /*
             * 如果当前活跃线程数小于核心线程数,就会添加一个worker来执行任务;
             * 具体来说,新建一个核心线程放入线程池中,并把任务添加到该线程中。
             */
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
    
          //程序执行到这里,说明要么活跃线程数大于核心线程数;要么addWorker()失败
    
            /*
             * 如果当前线程池是运行状态,会把任务添加到队列
             */
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
    
           //程序执行到这里,说明要么线程状态不是RUNNING;要么workQueue队列已经满了
    
            //调用addWorker方法去创建非核心线程,
            //如果当前线程数已经达到 maximumPoolSize,执行拒绝策略
            else if (!addWorker(command, false))
                reject(command);
        }
    

    这个方法就和名字一样,添加一个工人来完成任务;而这个工人就是Thread,任务就是Runnable。

    private boolean addWorker(Runnable firstTask, boolean core) {
           ...省略一些不重要的
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                //Worker是实现了Runnable接口的包装类
                w = new Worker(firstTask);
                //Thread是在Worker构造方法创建的
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
                      
                        //检查线程池状态,分为2种情况
                        //1、线程池处于RUNNING
                       //2、线程池处于SHUTDOWN并且firstTask==null
                       //这2种情况都会创建Worker来执行队列中的任务
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            //重新设置标识位
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        //启动线程
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    到这里,我们大概知道是通过创建Worker来执行任务的,而且线程是在Worker内部创建的,我们也能猜到Thread需要的Runnable应该也在Worker内部,所有我们继续看一下Worker类。

    Worker

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
         
            final Thread thread;
            /** Initial task to run.  Possibly null. */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
    

    Worker居然是一个Runnable任务,而且Worker的构造方法中创建了Thread对象。这样的话,在之前的addWorker()方法中调用t.start();就会执行到Worker的run()方法。

    继续来看看runWorker()方法,这里很关键,一定要仔细看。

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            //这个就是addWorker传进来的Runnable
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                //如果task不为null或从workQueue中获取任务不为null
                //就会一直执行
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
    
                    //检查线程池状态,如果线程池处于中断状态,将调用interrupt将线程中断。 
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        //中断线程
                        wt.interrupt();
                    try {
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            //线程任务执行啦
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    这里的关键在于这个while()条件判断,当第一次创建Worker时就有任务,当执行完这个任务后,这个方法并没有结束,而是不断地调用getTask()方法从阻塞队列中获取任务然后调用task.run()执行任务。

    getTask()获取任务

    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // 1.allowCoreThreadTimeOut表示是否允许核心线程超时销毁,默认是false,也就是说核心线程即使空闲也不会被销毁
              //当然,如果设置为true,核心线程是会销毁的
              //这样的话,只有正在工作的线程数大于核心线程数才会为true,否则返回false
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    //2.如果timed为true,通过poll取任务;如果为false,通过take取任务
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    这个方法是通过一个死循环取任务,取任务的话是通过workQueue这个阻塞队列去完成的,在默认不改变allowCoreThreadTimeOut的前提下,如果工作线程数大于核心线程数,则通过poll()从队列取任务;如果工作线程数小于核心线程数,则通过take()从队列取任务;这2个方法等区别是take()取任务时,如果队列中没有任务了会调用await()阻塞当前线程。这样的话,是不是已经搞清楚线程池中的核心线程复用的原因了。

    线程的唤醒是在execute时,当调用workQueue.offer()方法,将任务放入阻塞队列时,会调用Condition.signal()方法唤醒一个之前阻塞的线程。这部分不细讲,感兴趣的同学自行查看。

    总结

    • 1、当Thread的run方法执行完一个任务之后,会循环地从阻塞队列中取任务来执行,这样执行完一个任务之后就不会立即销毁了;
    • 2、当工作线程数小于核心线程数,那些空闲的核心线程再去队列取任务的时候,如果队列中的Runnable数量为0,就会阻塞当前线程,这样线程就不会回收了

    感谢以下作者

    Java线程池实现原理及其在美团业务中的实践
    线程池原理
    彻底理解Java线程池原理篇

    相关文章

      网友评论

        本文标题:Java线程池复用线程原理,都是教训,你不来看看?

        本文链接:https://www.haomeiwen.com/subject/fbpfxktx.html