美文网首页JDK源代码
ThreadPoolExecutor 源代码解析(base jd

ThreadPoolExecutor 源代码解析(base jd

作者: 冰殇之刃 | 来源:发表于2018-03-06 17:10 被阅读0次

ThreadPoolExecutor 是java线程池的默认实现。本文从源代码的角度来解析线程池,后续会出一个系列的源代码解析。

1.线程池初始化

下面是线程池最基础的初始化函数

public ThreadPoolExecutor(int corePoolSize,

                          int maximumPoolSize,

                          long keepAliveTime,

                          TimeUnit unit,

                          BlockingQueue workQueue,

                          ThreadFactory threadFactory,

                          RejectedExecutionHandler handler) {

    this.corePoolSize = corePoolSize;

    this.maximumPoolSize = maximumPoolSize;

    this.workQueue = workQueue;

    this.keepAliveTime = unit.toNanos(keepAliveTime);

    this.threadFactory = threadFactory;

    this.handler = handler;

}

corePoolSize :核心线程数,设置在队列不满的情况下线程池中的最大线程个数

maximumPoolSize:工作队列满了之后最大的线程个数

workQueue:翻译是工作队列,因为是任务的提交是先到队列,然后才到线程处理的,不管线程是否空闲

threadFactory:线程工厂,负责创建线程,包括线程优先级和线程名称,线程名称非常重要,这就很好给现有线程分类了

handler:RejectedExecutionHandler接口,是线程池处理不过来的任务的的处理策略。处理不过来的这个标准是工作队列满了并且线程池当前线程数涨到maximumPoolSize。

keepAliveTime:线程等待新任务的超时机制的时长,如果allowCoreThreadTimeOut为false,线程就是一直等新任务的xai

2.线程池中的属性的默认值

线程池状态

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static final int COUNT_BITS = Integer.SIZE - 3;

    private static final int CAPACITY  = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits

    private static final int RUNNING    = -1 << COUNT_BITS;//正常接收新的任务和处理队列中的任务

    private static final int SHUTDOWN  =  0 << COUNT_BITS;//不接受新的任务,但是处理队列中的任务

    private static final int STOP      =  1 << COUNT_BITS;//不接受新的任务,也不处理队列中的任务

    private static final int TIDYING    =  2 << COUNT_BITS;//所有的任务多处于中断状态,线程将要进行中断

    private static final int TERMINATED =  3 << COUNT_BITS;//中断完成

         线程池状态有五种,代码片段中,这个是用的整形最大值-1的高三位来做记录。这个是常用的比直接用整形值要简单并且计算更快。jdk1.6的实现使用的就是整形字段标识的。这算是jdk1.8性能的一个提升吧。

3.默认线程工厂

static class DefaultThreadFactory implements ThreadFactory {

        private static final AtomicInteger poolNumber = new AtomicInteger(1);

        private final ThreadGroup group;

        private final AtomicInteger threadNumber = new AtomicInteger(1);

        private final String namePrefix;

        DefaultThreadFactory() {

            SecurityManager s = System.getSecurityManager();

            group = (s != null) ? s.getThreadGroup() :

                                  Thread.currentThread().getThreadGroup();

            namePrefix = "pool-" +

                          poolNumber.getAndIncrement() +

                        "-thread-";

        }

        public Thread newThread(Runnable r) {

            Thread t = new Thread(group, r,

                                  namePrefix + threadNumber.getAndIncrement(),

                                  0);

            if (t.isDaemon())

                t.setDaemon(false);

            if (t.getPriority() != Thread.NORM_PRIORITY)

                t.setPriority(Thread.NORM_PRIORITY);

            return t;

        }

    }

默认的线程工厂就是设置一下线程的优先级和线程的名字,这里面有两个原子类标识的变量一个是threadNumber标识的是当前线程在这个线程池中的标识。poolNumber标识的是不同的线程池。这里不建议使用这个默认线程工厂,因为这个两个名字仅仅标识了不同,但是没有实际的业务意义。推荐使用ThreadPoolTaskExecutor spring中的线程池的实现,这里面的线程名字的前缀是这个线程池的beanName

4.RejectedExecutionHandler 线程池的拒绝策略

public interface RejectedExecutionHandler {

    /**

    * Method that may be invoked by a {@link ThreadPoolExecutor} when

    * {@link ThreadPoolExecutor#execute execute} cannot accept a

    * task.  This may occur when no more threads or queue slots are

    * available because their bounds would be exceeded, or upon

    * shutdown of the Executor.

    *

    *

In the absence of other alternatives, the method may throw

    * an unchecked {@link RejectedExecutionException}, which will be

    * propagated to the caller of {@code execute}.

    *

    * @param r the runnable task requested to be executed

    * @param executor the executor attempting to execute this task

    * @throws RejectedExecutionException if there is no remedy

    */

    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);

 拒绝策略实现有四种,线程池的默认实现是AbortPolicy 这种就是会直接抛出异常

CallerRunsPolicy 是直接调用run方法就直接串行执行了。

DiscardOldestPolicy 是将队列中第一个任务poll出,然后将这个任务放到队尾。队列是先进先出的,一般不会有从插队行为

DiscardPolicy 是本宝宝什么也没有做。

编写自定义的拒绝策略也是很简单,入参是当前任务和当前线程池。拥有当前线程池这个引用,想干啥都行了,就是注意当前是多线程环境,避免出现并发问题就行。

5.核心方法

就不画流程图了按照顺序说了

 1.submit

这个方法就是包装一下Runnable接口然后调用excute方法

public Future submit(Runnable task) {

    if (task == null) throw new NullPointerException();

    RunnableFuture ftask = newTaskFor(task, null);

    execute(ftask);

    return ftask;

}

2.excute

int c = ctl.get();

        //第一种情况需要创建新的线程,计算出当前的线程数量和核心线程数比较,如果小于就创建一个新的线程并且执行这个任务。

        // 这类线程被称为worker

        //失败的场景就是要创建线程的时候发现超过了核心线程数,因为是并发场景,就是有人手快所以就失败了

        // 那么这里c=ctl.get() 这里是必须要写的一行代码,因为这个值已经被更改了,需要调用一下get方法同步

        //不理解的参见原子类的使用

        if (workerCountOf(c) < corePoolSize) {

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

        //第二种情况,向队列提交任务提交任务,

        if (isRunning(c) && workQueue.offer(command)) {

            int recheck = ctl.get();

            //提交成功之后做一个双检查,因为这个时候可能会发生线程池关闭了。上面也是刷新一下ctl的值

            //确实线程池关闭了,就从队列中移除,然后执行拒接策略

            if (! isRunning(recheck) && remove(command))

                reject(command);

            //如果线程池没有关闭,但是中没有线程,就创建一个线程

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false);

        }

        //如果提交任务失败,尝试提高线程数量到最大线程数,不行就执行拒绝策略

        else if (!addWorker(command, false))

            reject(command);

3.addworker

private boolean addWorker(Runnable firstTask, boolean core) {

        retry:

        for (;;) {

            int c = ctl.get();

            int rs = runStateOf(c);

            // 对线程状态和空队列的检查

            if (rs >= SHUTDOWN &&

                    ! (rs == SHUTDOWN &&

                            firstTask == null &&

                            ! workQueue.isEmpty()))

                return false;

            for (;;) {

                int wc = workerCountOf(c);

                if (wc >= CAPACITY ||

                        wc >= (core ? corePoolSize : maximumPoolSize))

                    return false;

                //线程数增加CAS成功跳出循环如果失败刷新ctl的值然后继续循环

                if (compareAndIncrementWorkerCount(c))

                    break retry;

                c = ctl.get();  // Re-read ctl

                if (runStateOf(c) != rs)

                    continue retry;

                // else CAS failed due to workerCount change; retry inner loop

            }

        }

        boolean workerStarted = false;

        boolean workerAdded = false;

        Worker w = null;

        try {

            w = new Worker(firstTask);

            final Thread t = w.thread;

            if (t != null) {

                final ReentrantLock mainLock = this.mainLock;

                mainLock.lock();

                try {

                    // Recheck while holding lock.

                    // Back out on ThreadFactory failure or if

                    // shut down before lock acquired.

                    //获取线程池状态不在结束状态把worker加入进队列

                    // 为什么加锁呢,我觉得是因为workers 这个对象是hashset是线程不安全的

                    //largestPoolSize也是int类型。不同线程指令的进行随机

                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||

                            (rs == SHUTDOWN && firstTask == null)) {

                        if (t.isAlive()) // precheck that t is startable

                            throw new IllegalThreadStateException();

                        workers.add(w);

                        int s = workers.size();

                        if (s > largestPoolSize)

                            largestPoolSize = s;

                        workerAdded = true;

                    }

                } finally {

                    mainLock.unlock();

                }

                //判断线程是否添加成功,然后启动线程,再标识线程启动成功

                if (workerAdded) {

                    t.start();

                    workerStarted = true;

                }

            }

            //判断线程是否启动成功,如果没有成功把该线程从workers中移除

        } finally {

            if (! workerStarted)

                addWorkerFailed(w);

        }

        return workerStarted;

    }

3.Worker

private final class Worker

    extends AbstractQueuedSynchronizer

    implements Runnable

    }

    private static final long serialVersionUID = 6138294804551838833L;

    final Thread thread;

    Runnable firstTask;

    volatile long completedTasks;

Worker 继承自AbstractQueuedSynchronizer 实现Runnable 。也就是说这个worker自身会有锁特性,它在做中断的时候会给自身加个锁。也会在跑任务的时候加个锁,防止这个时候被中断。然后封装成线程。run方法就是worker跑任务的方法入口,执行的是runWorker方法

  /** worker的run方法,逻辑主要执行的runWorker  */

        public void run() {

            runWorker(this);

        }

前面都是一些判断,getTask是获取任务的重要方法,task.run 是任务执行 processWorkerExit 是跑任务的时候接受到了线程中抛出的异常,中断了这个线程之后的处理逻辑

final void runWorker(Worker w) {

        Thread wt = Thread.currentThread();

        Runnable task = w.firstTask;

        w.firstTask = null;

        w.unlock(); // allow interrupts

        boolean completedAbruptly = true;

        try {

            //getTask方法从队列当中取任务

            while (task != null || (task = getTask()) != null) {

                //正在跑任务加锁防止被中断

                w.lock();

                //判断线程池状态和中断信号

                if ((runStateAtLeast(ctl.get(), STOP) ||

                        (Thread.interrupted() &&

                                runStateAtLeast(ctl.get(), STOP))) &&

                        !wt.isInterrupted())

                    wt.interrupt();

                try {

                    //执行之前执行的方法,目前没有实现,需要继承线程池之后实现

                    beforeExecute(wt, task);

                    Throwable thrown = null;

                    try {

                        //核心方法跑任务

                        task.run();

                    } catch (RuntimeException x) {

                        thrown = x; throw x;

                    } catch (Error x) {

                        thrown = x; throw x;

                    } catch (Throwable x) {

                        thrown = x; throw new Error(x);

                    } finally {

                        //任务接受到异常之后执行的方法,目前没有实现,需要继承线程池之后实现

                        afterExecute(task, thrown);

                    }

                } finally {

                    task = null;

                    w.completedTasks++;

                    w.unlock();

                }

            }

            completedAbruptly = false;

        } finally {

            //走到这里就是跑的任务接受到异常退出了,进行的逻辑,

            processWorkerExit(w, completedAbruptly);

        }

gettask方法重要的逻辑是判断一些状态,然后从任务队列中拉去任务,这段代码很重要,复杂的涉及到两个逻辑,一个是任务队列当中没有任务之后,当前线程去留问题,另一个是

private Runnable getTask() {

        boolean timedOut = false; // 上一个任务再拉去的时候有没有超时

        for (;;) {

            int c = ctl.get();

            int rs = runStateOf(c);

            // 检查线程池状态为shutdown或者队列为空

            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

                //线程数减1操作

                decrementWorkerCount();

                return null;

            }

            int wc = workerCountOf(c);

            // 判断这个参数是否允许在核心线程有超时机制,或者当前线程数大于核心线程数

            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            // 判断当前线程数是否大于最大线程数 或者 这个timeOut标记是上一个任务有没有超时,如果有这个任务就很可能不跑了直接返回了

            //这个取决于这个核心线程有没有超时机制或者当前线程数大于核心线程数,都有,那就返回退出该线程了。

            // 并且当前线程确实为空并且当前线程池还有活跃的线程

            if ((wc > maximumPoolSize || (timed && timedOut))

                    && (wc > 1 || workQueue.isEmpty())) {

                //进行cas操作,如果失败,就继续跑任务

                if (compareAndDecrementWorkerCount(c))

                    return null;

                continue;

            }

            try {

                //线程池的超时机制其实是等待任务从任务队列中提交过来的时间,没有计算线程的执行时间

                Runnable r = timed ?

                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                        workQueue.take();

                if (r != null)

                    return r;

                timedOut = true;

            } catch (InterruptedException retry) {

                timedOut = false;

            }

        }

    }

processWorkerExit方法是处理线程中断之后的逻辑,要不就是移除引用之后就结束了,要不就是再添加新的线程补充上来

//worker 响应异常之后的处理,如果worker的死亡是用户抛出来的异常这个值就是true

    private void processWorkerExit(Worker w, boolean completedAbruptly) {

        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted

            decrementWorkerCount();

        //从线程池中移除该队列

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            completedTaskCount += w.completedTasks;

            workers.remove(w);

        } finally {

            mainLock.unlock();

        }

        tryTerminate();

        int c = ctl.get();

        if (runStateLessThan(c, STOP)) {

            //如果是用户线程抛出来的异常,线程池会立即补上这个数量,相当于立刻替换这个工人

            if (!completedAbruptly) {

                //如果不是用户抛出的异常线程池会判断允许超时机制存在么,存在,并且确实任务队列没有任务返回不再加新的线程了

                // 如果是线程数大于核心线程数,也是返回不加新的线程了

                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

                if (min == 0 && ! workQueue.isEmpty())

                    min = 1;

                if (workerCountOf(c) >= min)

                    return; // replacement not needed

            }

            //加新的线程补充之前死掉的线程

            addWorker(null, false);

        }

    }

后记

线程池的核心方法差不多都介绍完了,jdk中的代码确实非常经典,每一段代码都值得细细的去品味。线程池一个批量任务的并发框架,包含线程管理,任务管理,超时机制,在并发的场景,还会学到一些并发场景下的对象处理手段。看源代码首先就是先要明白这个段代码它想完成一个什么样的功能,再不看源代码的情景下,去猜内部实现逻辑,然后再去看代码,反复印证。这样你不仅是懂了这个代码实现,也会懂这个的设计原理,设计模式。这类的通用类型的设计思想。而且还要比较低版本和高版本之间的对比,知道这个升级是为什么问题,会有优化,重点是思想。这些都是非常通用,非常有价值的。

相关文章

网友评论

    本文标题:ThreadPoolExecutor 源代码解析(base jd

    本文链接:https://www.haomeiwen.com/subject/qzdrfftx.html