美文网首页线程池
线程池源码分析

线程池源码分析

作者: 念䋛 | 来源:发表于2020-12-22 18:55 被阅读0次

线程池源码分析

在线程池执行任务的时候,都是主线程(可能是tomcat线程),将任务放到线程池中,再执行线程池的execute方法,那我们就从execute入手分析线程池的源码.因为是多线程,无法能分析所有的情况,只能顺着主线程分析源码

public void execute(Runnable command) {
//如果传进来的为null抛出异常
    if (command == null)
        throw new NullPointerException();
//获取当先正在运行的线程数
    int c = ctl.get();
//如果小于核心线程数,这里可以看出如果线程池中有闲着的线程,但是每次都是先走这段逻辑,也就是不管是否有空余线程,也是会创建一个新的线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
//如果大于核心线程数,并且线程池是Running状态,并尝试将任务放到阻塞队列中,如果成功则进入if条件
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
//创建非核心线程,false就是非核心线程的意思,如果创建失败则进入到拒绝策略
    else if (!addWorker(command, false))
        reject(command);
}

主要就是addwork方法,在介绍addword方法之前,先介绍一下Worker的构造函数, Worker实现了Runnable接口

Worker(Runnable firstTask) {
 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask;

//线程工厂类创建线程的时候,是将Worker本身传进来,而不是firstTask任务 
this.thread = getThreadFactory().newThread(this); }

下面分析addWorker方法,firstTask是我们要执行的任务,core代表是否为核心线程,true为核心线程

private boolean addWorker(Runnable firstTask, boolean core) {
//这段for代码的主要意思就是,先判断线程池的状态,在判断数量是否已经超过了 CAPACITY  极限或者根据
//core为true/false来判断是否超过了核心线程或者超过了最大线程数,如果超过则返回false,如果没有超过,则将
//ctl利用cas加一就是线程数加一,cas一般都是配合死循环使用,然后跳出for循环,因为这里还没有创建线程执行
//任务就先加一,所以在后续的代码中执行错误的话,会在finally的addWorkerFailed方法,将ctl减一
retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
//下面是创建线程执行任务的代码
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
//创建一个worker,
        w = new Worker(firstTask);
//这里的t的Runnable为w本身
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
//这里的t.start()执行的是w本身,执行Worker的run方法
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

执行到了t.start()方法,就是执行Worker的run方法,最终执行到了runWorker方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
//真正的任务
Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
    try {
//下面就是最核心的方法,通过一个while循环达到线程的重用,在最后的finally中task赋值为null,
//task != null为false,那么跳出循环的条件主要集中在(task = getTask()) != null,如果为null则跳出while循环,
//当while循环跳出的时候,线程执行结束, 还要经过runWorker的最后一个方法processWorkerExit(w, completedAbruptly);
//最后决定是否线程执行结束,再等待被GC回收,getTask是从队列中获取的任务,后续会分析到,
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
              try {
                    task.run();
                } catch (RuntimeException x) {
//出现异常继续抛出
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
//执行任务后将task赋值为null
                task = null;
//任务完成的总数
                w.completedTasks++;
                w.unlock();
            }
        }
//为flase和true在下面的processWorkerExit方法中使用到,是线程回收还是放到线程池的首要条件,
//如果在task.run()出现异常时,没有catch捕获,这段代码是不会执行,
//执行下一个finally通过processWorkerExit方法中的 addWorker(null, false);
//添加空任务的worker,就是添加空闲线程,等待从队列中获取任务
        completedAbruptly = false;
    } finally {
//推出while寻呼那后,执行该方法,看是否还能留在线程池中.
        processWorkerExit(w, completedAbruptly);
    }
}

getTask方法

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
//查看线程池中线程数
        int wc = workerCountOf(c);

        // Are workers subject to culling?
//是否允许回收线程,大于corePoolSize好理解,就是多余核心线程是要回收的, allowCoreThreadTimeOut的意 
//思是是否回收核心线程,默认为false,可以手动设置为true,如果为true,就是不管是否线程数大于核心线程 
//数,timed都为true
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// wc > maximumPoolSize || (timed && timedOut)主要看timeOut是否为true, wc > maximumPoolSize基本都 
//为false
        if ((wc > maximumPoolSize || (timed && timedOut))
//wc是否大于1, workQueue.isEmpty()队列是否为空,如果队列为空没有执行的任务时
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
//返回null的时候,在上面的runWorker方法的while循环就会跳出,则线程执行结束,还要经过runWorker的最后一 
//个方法processWorkerExit(w, completedAbruptly);最后决定是否线程执行结束,再等待GC的回收,线程也就销 
//毁了,核心线程和非核心线程都在while循环中,而返回null的线程是不确定的,就是被回收的线程可能是核心线程,也可能是非核心 
//线程
            return null;
            continue;
        }

        try {
//如果timed为true执行poll,从队列中获取任务是有时间要求的,如果超过规定时间则返回null
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//一直阻塞,直到有任务
                workQueue.take();
//为null,则是在执行poll的时候会返回null
            if (r != null)
                return r;
//返回null时timedOut为true,就回到了上面说的判断(timed && timedOut)如果timed为true,则结果就返回true
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

processWorkExit方法

private void processWorkerExit(Worker w, boolean completedAbruptly) {
//将ctl减一,线程池的线程数减一
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
//已完成的线程数加一
        completedTaskCount += w.completedTasks;
//任务从workers去除
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
//当任务发生异常的时候completedAbruptly为true,跳过判断
        if (!completedAbruptly) {
//如果允许回收核心线程,min为0,否则win为核心线程数
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
//如果队列不为空,min还为0 赋值为1,就是队列中有任务的时候,至少保证有一个线程在线程池中
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
//如果大于了,就return 这回就线程就真的结束了,等待被GC回收
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
//添加没有任务的worker,就是将线程重新放到线程池中,最终是否真的放进去,还要经过addWorker方法的一系列操作
        addWorker(null, false);
    }
}


线程池的几种状态
https://blog.csdn.net/qq_36031640/article/details/91448439

相关文章

网友评论

    本文标题:线程池源码分析

    本文链接:https://www.haomeiwen.com/subject/czosnktx.html