Java线程池ThreadPoolExecutor深度探索及源码

作者: 非墨Zero | 来源:发表于2016-11-21 14:43 被阅读246次

    我们的程序里,时常要使用多线程。因此多线程的管理变的尤为重要。ThreadPoolExecutor很好的解决了这一点。本篇文章主要从源码入手,分析ThreadPoolExecutor的原理。

    1.标记和构造方法####

    和很多状态对象一样,ThreadPoolExecutor也通过一个int的头3位来记录线程池的状态,后面20多位来标记工作线程数量。并且提供通用的位运算接口来获得你所需要的数据。

        private static final int RUNNING    = -1 << COUNT_BITS;
        private static final int SHUTDOWN   =  0 << COUNT_BITS;
        private static final int STOP       =  1 << COUNT_BITS;
        private static final int TIDYING    =  2 << COUNT_BITS;
        private static final int TERMINATED =  3 << COUNT_BITS;
    

    我们先来看下ThreadPoolExecutor的构造方法,这里似乎我们又要老生常谈了,网上已经有很多关于线程池各个参数的介绍了,这里,非墨还是会再说一遍,这样加深一下大家的印象。

    public ThreadPoolExecutor(int corePoolSize, //核心线程数量
                                  int maximumPoolSize,//最大线程数量
                                  long keepAliveTime,//存活时间
                                  TimeUnit unit,//存活时间单位
                                  BlockingQueue<Runnable> workQueue,//工作队列
                                  ThreadFactory threadFactory,//线程构造工厂
                                  RejectedExecutionHandler handler//拒绝请求回调
    ) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    

    2.执行流程####

    按照我们熟知的线程池机制,
    1.当请求被post到我们的线程池中,我们的线程池会先生成一个核心线程来执行它
    2.当核心线程满了的时候,将会把这个请求放入到我们的工作请求队列workQueue中。
    3.如果你提供的队列是一个有界队列的时候,线程池将会判断你的最大线程数是否超过你的核心线程。如果超过核心线程的话,线程池会生成新的线程去执行它。
    4.如果这个时候,已经达到了最大线程数,那么线程池将走到拒绝回调
    5.如果线程池的最大线程数不大于核心线程数,并且工作队列已满,那么将直接走拒绝回调

    实际上这个流程已经在ThreadPoolExecutor.execute方法注释中有详细的说明。即便没有说明,我们也可以从它的代码流程简单看出一些端倪:

    //code ThreadPoolExecutor.execute(Runnable)
            int c = ctl.get();//获取当前运行线程数
            if (workerCountOf(c) < corePoolSize) {//如果当前线程数小于核心线程数
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {//将请求命令加入到工作队列
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))//加入到非核心线程中
                reject(command);//如果非核心线程没有执行,那么将走拒绝请求回调
    

    3.深入源码###

    我们以execute为入口,深入分析一下这个线程池的源码。int c = ctl.get()方法我们暂时不说,后面我们将会补充,我们暂且把它理解为获得一个数量,而这个数量c将会传入到workerCountOf方法中。这个方法名称我们就能知道其用意,就是为了获得当前工作线程数量。

    private static int workerCountOf(int c)  { return c & CAPACITY; }
    

    上文我们说到,线程池会通过一个int的后几位来记录线程数量,而workerCountOf就是通过位运算来获得当前工作线程数。在获得当前线程数了以后,如果当前线程数小于
    corePoolSize的话,将会通过addWorker方法把command加入到工作线程中。addWorker需要提供两个参数,一个是你的command,另外一个boolean量是为了标识是否是往core线程中加。

    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();//获得一个含有状态和数量的值
                int rs = runStateOf(c);//获得当前线程池状态
                ...
               for (;;) {//第二个for
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
       }
    

    这里,通过上面的代码我们可以清晰的了解ctl变量的存在的目的:
    1.首先,当从类型上看clt是一个原子类型,说明它是要支持多线程调用的
    2.ctl里面的值需要存储两个信息,一个是线程数量,一个是当前线程池的工作状态。

    这时候是否有读者还在纳闷,为什么我的线程数小于我的核心线程数,我往我的线程池里加,还是可能出现加不进去的情况。事实上,“第二个for”循环已经很好的说明了这一点。因为线程池不能保证是同一个线程调用addWorker方法。线程池需要同步过后,才能保证是否是否往核心线程里面加。这就是为什么在ThreadPoolExecutor.execute方法里,在判断完核心线程数量之后,如果失败了,还要再取一次当前线程数的原因。

     if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();//再取一次
            }
    

    好的,我们继续回到"第二个for"。我们可以看出,线程池在同步方面不仅细化了粒度,而且用的是CAS算法。这种算法可以劲量的避免由于sync引起的线程阻塞。

    for (;;) {//第二个for
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                   //当线程池数量超过核心线程的时候退出,返回false
                    if (compareAndIncrementWorkerCount(c))
                        break retry;//当增加线程成功的时候,跳出循环
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;//状态不一致继续循环
                    // else CAS failed due to workerCount change; retry inner loop
                }
    

    由于我们现在只有一个线程在工作,不存在多线程竞争的情况,因此我们选择跳出循环的逻辑。跳出循环以后,程序将真正意义上的生成一个Worker线程来执行指令。

    //code private boolean addWorker(Runnable firstTask, boolean core)
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);//生成一个worker对象
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();//锁定全局锁
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());//检查线程池状态
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);//将Worker线程纳入workers集合对象管理
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;//重新赋值largestPoolSize变量
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
    

    上面的代码非常简单,线程池将生成一个Worker的线程包装类。不论是是否是核心线程,所有的线程都被纳入到workers集合对象进行管理。如果一切流程都正常workerAdded将为true,Worker里的线程将被启动。启动后Worker将执行线程的run方法,而在run方法中,又调用到Worker的runWorker(Worker)方法:

    public void run() {
                runWorker(this);
            }
    

    runworker是真正的线程执行流程的代码段:

    // code runworker
     try {
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();//判断当前线程池状态,
                    try {
                        beforeExecute(wt, task);//执行前切面
                        Throwable thrown = null;
                        try {
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            afterExecute(task, thrown);//执行后切面
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                processWorkerExit(w, completedAbruptly);
            }
    

    runworker方法中引申出来两个方法beforeExecute和afterExecute。可以通过继承的方式来监控command的执行。相当于在command.run之前和之后切了两个面,是一种面向方面的编程模式。当Task执行完成之后,由于while循环,将再次执行while的判断条件task = getTask()) != null; getTask方法是可能阻塞的,阻塞的时间是根据你在构造线程池的时候设置的超时时间来决定的。

    private Runnable getTask() {
            boolean timedOut = false; //是否判断超时
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // Check if queue empty only if necessary.
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // Are workers subject to culling?
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                //allowCoreThreadTimeOut变量用于控制是否让核心线程也进行超时判断
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    Runnable r = timed ?//通过timed变量来选择使用poll方法还是take方法
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;//如果poll获取的r为空,标记为超时
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    
    
    >getTask还是一个循环操作,第一次执行的时候,会通过timed变量来判断是否有超时检查,如果有超时检查的话将调用poll方法。如果poll在规定的时间内并没有获得任何的执行对象,返回的r为null,timedOut将被标记为true。这时候,又再次进入循环。这时候,如果你是非核心线程,是扩展线程的话,那么,if ((wc > maximumPoolSize || (timed && timedOut))这个判断为true,程序将返回一个null。
    在runWorker方法中,如果getTask返回的对象为null,runWorker将跳出while循环,执行finally语句:
    
       finally {
            processWorkerExit(w, completedAbruptly);
        }
    
    
    >processWorkerExit方法需要传递两个变量,第一个变量是Worker对象,第二个变量是completedAbruptly变量,这个变量是干什么用的呢?因为你的程序跳出可能存在两种情况,一种是正常跳出,一种是异常跳出,如果是异常跳出的话,这个时候你的workercount未必正常的执行decrement操作,因此通过这个变量来标记程序的执行状态。
    

    //code processWorkerExit
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
    completedTaskCount += w.completedTasks;
    workers.remove(w);
    } finally {
    mainLock.unlock();
    }

    mainLock是一个全局锁,主要是为了同步全局的workers变量。上面的代码中,线程池将记录一下task执行数据,并且将worker从workers队列中删除。
    这个时候,基本上整个线程池的流程都已经概述完了,当然,我们还确实一个变量,那就是RejectedExecutionHandler类型变量。这个得回到我们的execute方法:
    

    if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
    reject(command);
    else if (workerCountOf(recheck) == 0)
    addWorker(null, false);
    }
    else if (!addWorker(command, false))
    reject(command);//拒绝请求

    
    
    >当线程池拒绝请求的时候,将调用reject方法,而reject方法将会回调RejectedExecutionHandler的rejectedExecution方法:
    

    final void reject(Runnable command) {
    handler.rejectedExecution(command, this);
    }

    线程池提供一个默认的拒绝请求回调:
    

    //code ThreadPoolExecutor
    private static final RejectedExecutionHandler defaultHandler =
    new AbortPolicy();
    //code AbortPolicy
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
    " rejected from " +
    e.toString());
    }

    也就是采用异常的方式来拒绝请求。
    
    
    这样,ThreadPoolExecutor的主要源码和结构已经分析完了,当然还有其他的特性和功能需要看官们自己去探索。
                                                                                                            -----非墨

    相关文章

      网友评论

      • 大道至简_Andy:3.如果你提供的队列是一个有界队列的时候,线程池将会判断你的最大线程数是否超过你的核心线程。如果超过核心线程的话,线程池会生成新的线程去执行它。
        =========================================
        这里的说法不太恰当,源码中说的是offer方法。并没有说是有界还是无界。无界也并不是真正的无界了,只是size是Interger.MAX。

      本文标题:Java线程池ThreadPoolExecutor深度探索及源码

      本文链接:https://www.haomeiwen.com/subject/uylupttx.html