美文网首页Java 杂谈互联网科技Java
十分钟深入理解Java线程池源码及设计原理

十分钟深入理解Java线程池源码及设计原理

作者: Java_苏先生 | 来源:发表于2019-07-14 21:07 被阅读15次

    一、说明

    下面如果有贴出源码,对应的源码是JDK8

    主要的源码类

    java.util.concurrent.ThreadPoolExecutor、
    java.util.concurrent.ThreadPoolExecutor.Worker
    java.util.concurrent.AbstractExecutorService

    1. 类继承图

    二、线程池的状态

    三、源码分析

    1. 完整的线程池构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
    

    2. ctl

    内部有重要的成员变量ctl,类型是AtomicInteger,低29位表示线程池中线程数,通过高3位表示线程池的运行状态

    COUNT_BITS的值是29

    • RUNNING:-1 << COUNT_BITS,即高3位为111,该状态的线程池会接收新任务;
    • SHUTDOWN: 0 << COUNT_BITS,即高3位为000,该状态的线程池不会接收新任务;
    • STOP : 1 << COUNT_BITS,即高3位为001;
    • TIDYING : 2 << COUNT_BITS,即高3位为010, 所有的任务都已经终止;
    • TERMINATED: 3 << COUNT_BITS,即高3位为011, terminated()方法已经执行完成

    3. 任务的执行

    execute --> addWorker --> Thread.start --> (Thread.run) --> runTask --> getTask

    3.1execute(Runnable command)

    大致分三个步骤

    • 当前运行的线程数量是否小于corePoolSize,直接尝试addWorker()
    • 往阻塞队列里面放入Runnable任务
    • 如果队列已经满了,直接尝试addWorker()

    3.2 addWorker(Runnable firstTask, boolean core)

    • 前置判断线程池的状态
    • 通过CAS操作让ctl加1,表示运行线程数增加1个
    • 构造一个Worker w,这里要特别注意构造方法里面的这行代码,this.thread = getThreadFactory().newThread(this),可以看到构造方法内,有一个Thread对象,其使用了ThreadFactory构造了一个新的线程,并且线程的runable是worker本身。
    • 执行w.thread.start(),也就是说,当该线程被运行时,Worker中的run方法会被执行

    3.3 runWorker(Worker w)

    通过循环调用getTask()获取要执行的任务task
    beforeExecute
    task.run()
    afterExecute

    3.4 getTask()

    源码如下:

    private Runnable getTask() {
        Boolean timedOut = false;
        // 是否最后的 poll() 超时了?
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            Boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // worker是否需要被淘汰
            if ((wc > maximumPoolSize || (timed && timedOut))
                        && (wc > 1 || workQueue.isEmpty())) {
                // 这里会让线程的数量记录减,后面的return null,会导致runWorker没有获取到数据而让run()方法走到尽头,最终当前线程结束
                if (compareAndDecrementWorkerCount(c))
                                return null;
                continue;
            }
            try {
                // 如果需要回收一部分线程,那么超时时间keepAliveTime后拿不到就数据就继续循环调用,就可以在下一次循环的时候进行线程结束回收了;否则一直阻塞下去
                Runnable r = timed ?
                                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                                workQueue.take();
                if (r != null)
                                return r;
                timedOut = true;
            }
            catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    

    四、任务执行,带返回值的

    源码如下:

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }
    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    

    代码比较简单,把任务封装成一个既实现Runnable, 也实现Future<v style="margin: 0px; padding: 0px;">的接口,这个时候就可以调用execute()进行实现了</v>

    写在最后

    • 第一:点赞;
    • ...
    • 第二:转发;
    • ...
    • 第三:关注!!!
    • ...

    相关文章

      网友评论

        本文标题:十分钟深入理解Java线程池源码及设计原理

        本文链接:https://www.haomeiwen.com/subject/ngwikctx.html