美文网首页
深入理解线程池源码

深入理解线程池源码

作者: 天还下着毛毛雨 | 来源:发表于2022-03-24 23:34 被阅读0次
    image

    核心属性

    1. corePoolSize :核心线程数,一般情况下,该数量的核心线程创建好之后,会常驻在线程池中,不会应空闲而关闭,可以设置allowCoreThreadTimeOut=true使核心线程空闲关闭
    2. maximumPoolSize :最大线程数,>核心线程数。
    3. keepAliveTime : 空闲时间,当线程获取任务时,超过keepAliveTime仍然获取不到任务,那么线程执行完所有逻辑后,自动消亡,workerSet也会移除该worker对象
    4. unit : 空闲事件keepAliveTime 的单位
    5. BlockingQueue<Runnable> workQueue : 任务的阻塞队列,当前提交任务时,工作线程已经>= 核心线程数, 则会将任务 推入阻塞队列中。如果阻塞队列达到最大长度,则会在工作线程数 不超过最大线程数maximumPoolSize的情况下,继续创建空闲线程来处理任务。
    6. ThreadFactory threadFactory: 创建线程的工厂
    7. RejectedExecutionHandler handler :任务的拒绝策略。当线程数任务阻塞队列满了,且工作线程数 大于等于 最大线程数了, 则 线程池无法调度线程则处理任务,调用构造方法传入的RejectedExecutionHandler实例的rejectedExecution()方法来拒绝任务。
    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    线程池状态

    ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

    状态 value 说明
    RUNNING(当线程池创建出来的初始状态) 111 能接受任务,能执行阻塞任务
    SHUTDOWN(调用shutdown方法) 000 不接受新任务,能执行阻塞任务 肯定可以 執行正在執行的任務
    STOP(调用shutDownNow) 001 不接受新任务,打断正在执行的任务,丢弃阻塞任务
    TIDYING(中间状态) 010 任务全部执行完,活动线程也没了
    TERMINATED(终结状态) 011 线程池终结

    常用api

    execute(Runable)

    执行任务,无返回值

    public void execute(Runnable command);
    

    submit(Runable)

    会返回Future对象,调用Future对象的get(),会阻塞,直到拿到返回值返回

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    

    doInvokeAny

    返回最快执行完的任务的结果,集合中其他正在执行的线程会被关闭

    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
    

    invokeAll

    执行所有任务,返回List<Future<T>>

    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    

    shutdown()

    不会接收新的任务,但是已经运行和在队列中的任务会执行完,然后在关闭线程

    线程状态变成SHUTDOWN

     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    

    awaitTermination(long timeout, TimeUnit unit)

    等待线程池关闭,会提前也会超时

    public boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }
    

    shutdownNow

    打断所有所有正在执行的任务,返回队列中的任务

    线程状态变成STOP

        /**
         * Attempts to stop all actively executing tasks, halts the
         * processing of waiting tasks, and returns a list of the tasks
         * that were awaiting execution. These tasks are drained (removed)
         * from the task queue upon return from this method.
         *
         * <p>This method does not wait for actively executing tasks to
         * terminate.  Use {@link #awaitTermination awaitTermination} to
         * do that.
         *
         * <p>There are no guarantees beyond best-effort attempts to stop
         * processing actively executing tasks.  This implementation
         * cancels tasks via {@link Thread#interrupt}, so any task that
         * fails to respond to interrupts may never terminate.
         *
         * @throws SecurityException {@inheritDoc}
         */
        public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                checkShutdownAccess();
                advanceRunState(STOP);
                // 打断正在工作的线程
                interruptWorkers();
                // 从队列中取出等待的任务
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
            // 返回队列中等待的任务
            return tasks;
        }
    

    源码解析

    流程图

    image

    一 、任务的执行以及线程的创建 : execute(Runnable task):

    传入Runnable 对象作为要执行的任务。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 1110000000000000000000000 | 0 =  111000000000000000000
        int c = ctl.get();
        // 第一种情况 :计算当前工作线程数是否小于 所配置的核心线程数
        //  workerCountOf(c) : c & CAPACITY
        //  CAPACITY : (1 << COUNT_BITS)-1 : 先左移29位 0010 0000 0000.....   然后-1 0001 1111 1111 1111 1111....后面29位全是1
        //  这时候&c,那么 c的 低29位 与 000 11111...(29个1)  做 & 运算,只有 为1的才保留下来, 结果为高三位 为0,标识线程数量的29位,可以很好的保留下来。
        if (workerCountOf(c) < corePoolSize) {
            // 则创建新增核心线程,并执行task
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 第二种情况 :当前工作线程数已经 大于等于核心线程数了,尝试往阻塞队列中添加task
        // 如果 阻塞队列还没有满,则是添加成功的
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 如果线程非运行状态,并且移除任务成功,执行线程池决拒绝任务的策略
            // 啥时候移除不成功?可能并发的时候,被别的线程 取出来了 执行了。
            if (! isRunning(recheck) && remove(command))
                // 拒绝任务
                reject(command);
            // // 判断工作线程是否 = 0,这种情况是 核心线程数为0的时候,需要创建空闲线程来处理队列中的任务,比如CachedThreadPool,第一次进来
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 第三种情况 :如果阻塞队列满了,阻塞队列添加task失败,就会尝试创建空闲线程
        // 会判断 当前工作线程数是否 < 最大线程数 maximumPoolSize, 如果小于就可以创建空闲线程
        else if (!addWorker(command, false))
            // 第四 : 种情况如果 >= maximumPoolSize,执行拒绝策略
            reject(command);
    }
    

    1. 当前工作线程小于核心线程数,则创建Worker对象,加入到workerSet中

    image
    addWorker(command, true)

    创建Worker对象(持有线程),并调用持有线程的start方法,在run方法中执行runnable

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                // 获取当前工作线程数
                int wc = workerCountOf(c);
                //  如果当前工作线程数 大于最大线程数 2 ^ 29 -1 ,或者大于 (根据当前添加工作线程的类型) 核心线程数 还是 线程池最大线程数
                // 核心线程 判断是否 > corePoolSize,空闲线程,判断 maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // cas,工作线程数+1,退出循环,因为并发问题只有对工作线程数量加成功了,才能在下面开启线程。
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // 重新校验线程池 运行状态
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 下面走创建线程的逻辑
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 线程一个worker, Worker 是Thread的子类,传入runnable对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 加锁
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // 不是关闭状态
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 加到工作线程集合中
                        workers.add(w);
                        // 当前工作线程集合大小
                        int s = workers.size();
                        // 更新 线程池 线程数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 执行task,启动线程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    Worker.Run方法
    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;
    
        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;
    
        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
    
        // 执行任务
        public void run() {
            runWorker(this);
        }
    }
    

    在run方法中调用runWorker方法,传入当前对象

    1. 该worker对象第一次执行任务时,w.firstTask是!= null的,所以可以进入while的循环体, 执行Runnable的run方法
    2. 第二次进来则从阻塞队列中拿任务。
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // task第一次被创建时,构造方法传入了Runnable对象,所以现在是!= null的
        Runnable task = w.firstTask;
        // 之后清空,第二次进来是null
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 无限循环,当前有任务未执行 或者 阻塞队列中有任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行runnable的run方法执行业务逻辑
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 允许核心线程超时关闭 或者 当前工作线程数 > 核心线程数
            // 线程关闭前,从worderSet中移除worker对象
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    2. 当前工作线程数已达到核心线程数了,但是阻塞队列还没满

    则会往workQueue 存入Runnable对象

    如果 队列长度还没达到上限,则offer方法会成功存入Runnable对象,返回true

    如果 队列长度已达到上限,则返回false,说明当前工作线程从队列中拿task,处理task的速度还不够,会创建非工作线程。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        // 1110000000000000000000000 | 0 =  111000000000000000000
        int c = ctl.get();
        // 第一种情况 :计算当前工作线程数是否小于 所配置的核心线程数
        //  workerCountOf(c) : c & CAPACITY
        //  CAPACITY : (1 << COUNT_BITS)-1 : 先左移29位 0010 0000 0000.....   然后-1 0001 1111 1111 1111 1111....后面29位全是1
        //  这时候&c,那么 c的 低29位 与 000 11111...(29个1)  做 & 运算,只有 为1的才保留下来, 结果为高三位 为0,标识线程数量的29位,可以很好的保留下来。
        if (workerCountOf(c) < corePoolSize) {
            // 则创建新增核心线程,并执行task
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 第二种情况 :当前工作线程数已经 大于等于核心线程数了,尝试往阻塞队列中添加task
        // 如果 阻塞队列还没有满,则是添加成功的
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            // 如果线程非运行状态,并且移除任务成功,执行线程池决拒绝任务的策略
            // 啥时候移除不成功?可能并发的时候,被别的线程 取出来了 执行了。
            if (! isRunning(recheck) && remove(command))
                // 拒绝任务
                reject(command);
            // // 判断工作线程是否 = 0,这种情况是 核心线程数为0的时候,workQueue.offer(command)返回false,就需要创建空闲线程来处理队列中的任务,比如CachedThreadPool,第一次进来
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 第三种情况 :如果阻塞队列满了,阻塞队列添加task失败,就会尝试创建空闲线程
        // 会判断 当前工作线程数是否 < 最大线程数 maximumPoolSize, 如果小于就可以创建空闲线程
        else if (!addWorker(command, false))
            // 第四 : 种情况如果 >= maximumPoolSize,执行拒绝策略
            reject(command);
    }
    

    3. 阻塞队列已满,添加task失败,就会尝试创建空闲线程

    创建空闲线程的方法和创建核心线程的方法都是addWorker(runnable,boolean core),只不过传入的core参数是false,表示是空闲线程

    如果是空闲线程创建,则会判断当前工作线程数是否 > 最大线程数maximumPoolSize, 如果是创建核心线程则判断的是 核心线程数

    如果大于 最大线程数maximumPoolSize 就会创建线程失败

     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                // 获取当前工作线程,
                int wc = workerCountOf(c);
                // 如果当前工作线程数 大于最大线程数 2 ^ 29 -1 ,或者大于 (根据当前添加工作线程的类型) 核心线程数 还是 线程池最大线程数
                // 核心线程 判断是否 > corePoolSize,空闲线程,判断 maximumPoolSize
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 后面创建线程Worker对象和创建核心线程是一模一样的
     }
    

    4. 任务拒绝:阻塞队列满了,并且 工作线程数已经达到最大线程数了, 则尝试创建空闲线程会失败,走任务的拒绝策略。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        // -100000000000000000000000000000 | 0 =  -100000000000000000000000000000
        int c = ctl.get();
        // 第一种情况 :计算当前工作线程数是否小于 所配置的核心线程数
        //  workerCountOf(c)  : 11111111111111111111111111111 & -100000000000000000000000000000
        if (workerCountOf(c) < corePoolSize) {
            // 则创建新增核心线程,并执行task
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        // 第二种情况 :当前工作线程数已经 大于等于核心线程数了,尝试往阻塞队列中添加task
        // 如果 阻塞队列还没有满,则是添加成功的
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 第三种情况 :如果满了,阻塞队列添加task失败,就会尝试创建空闲线程
        // 会判断 当前工作线程数是否 < 最大线程数 maximumPoolSize, 如果小于就可以创建空闲线程
        else if (!addWorker(command, false))
            // 如果 >= maximumPoolSize,执行拒绝策略
            reject(command);
    }
    

    jdk提供的拒绝策略类 :

    image
    1. AbortPolicy 抛异常

      /**
       * A handler for rejected tasks that throws a
       * {@code RejectedExecutionException}.
       */
      public static class AbortPolicy implements RejectedExecutionHandler {
          /**
           * Creates an {@code AbortPolicy}.
           */
          public AbortPolicy() { }
      
          /**
           * Always throws RejectedExecutionException.
           *
           * @param r the runnable task requested to be executed
           * @param e the executor attempting to execute this task
           * @throws RejectedExecutionException always
           */
          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
              throw new RejectedExecutionException("Task " + r.toString() +
                                                   " rejected from " +
                                                   e.toString());
          }
      }
      
    2. DiscardPolicy 丢弃 = 啥也不干

      /**
       * A handler for rejected tasks that silently discards the
       * rejected task.
       */
      public static class DiscardPolicy implements RejectedExecutionHandler {
          /**
           * Creates a {@code DiscardPolicy}.
           */
          public DiscardPolicy() { }
      
          /**
           * Does nothing, which has the effect of discarding task r.
           *
           * @param r the runnable task requested to be executed
           * @param e the executor attempting to execute this task
           */
          public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
          }
      }
      
    3. DiscardOldestPolicy 推出并忽略阻塞队列中的第一个任务,尝试执行当前任务

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardOldestPolicy} for the given executor.
         */
        public DiscardOldestPolicy() { }
    
        /**
         * Obtains and ignores the next task that the executor
         * would otherwise execute, if one is immediately available,
         * and then retries execution of task r, unless the executor
         * is shut down, in which case task r is instead discarded.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
             // poll出阻塞队列中的第一个任务,并忽略掉
                e.getQueue().poll();
                // 以执行当前任务
                e.execute(r);
            }
        }
    }
    

    ................................................

    二 、线程的维护

    线程池中的作用维护线程,避免频繁创建,销毁线程而带来系统资源的浪费。

    核心线程默认(可以配置allowCoreThreadTimeOut = true 来设置 注销核心线程 )是不会在执行完某一个任务后被注销的

    空闲线程 在空闲时间达到keepAliveTime 后, 会自动注销(执行完run方法)。

    1. 线程的阻塞 :

    Worker.runWorker(Worker w)

    线程的执行方法中,用while的方式,判断 当前是否有任务(第一次被创建出来) 或者 从阻塞队列中拿任务

    1. 判断 当前是否有任务(第一次被创建出来)
    2. 阻塞队列中有任务 :execute任务时,当工作线程数 > 大于核心线程数时且 阻塞队列没有满时, 会把任务存入阻塞队列
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // task第一次被创建时,构造方法传入了Runnable对象,所以现在是!= null的
        Runnable task = w.firstTask;
        // 之后清空,第二次进来是null
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // 无限循环,当前有任务未执行 或者 阻塞队列中有任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行runnable的run方法执行业务逻辑
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 允许核心线程超时关闭 或者 当前工作线程数 > 核心线程数
            // 线程关闭前,从worderSet中移除worker对象
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    如果可以不停的获取任务,处理任务,这种情况下 所有线程都不会被注销,因为无法退出while循环

    2. 线程的注销

    但是当没有任务提交时,也就是当前任务没有,阻塞队列里也拿不到任务,线程则处于空闲状态,空闲线程 空闲状态下的时间达到keepAliveTime ,则会退出while循环,结束线程。

    而核心线程则会在getTask中(如果没配置allowCoreThreadTimeOut=true) 阻塞住, 不返回结果,直到阻塞队列中可以获取到任务, 再进入while循环体。

    getTask()

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            // Are workers subject to culling? 
            // 是否允许核心线程超时关闭 或者 当 工作线程数 > 核心线程数了
            // 当 线程中 只剩下核心线程的时候, wc > corePoolSize 就不会返回true,则会workQueue.take()阻塞住
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                // 如果允许核心线程超时注销 或者 当前工作线程数 > 核心线程数, 则调阻塞队列的 poll,超时返回null
                // 否则调take()方法,一直拿不到就一直阻塞
                
                // 这就说明,只有允许核心线程超时注销,或者 当 当前工作线程数 > 核心线程数时,才会调 阻塞队列会超时的poll方法,
                // runWorker方法才会退出while循环体, 结束线程
                
                // 如果allowCoreThreadTimeOut被设置为true,则所有线程从队列中拿任务调用的都是workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法,所有线程在poll超时之后,仍然没获取到任务,则会返回 null ,退出循环体, 结束线程
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    

    从workers移除线程

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();
    
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 从workers移除线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
    
        tryTerminate();
    
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                // 如果允许核心线程超时关闭,则为0,否则为corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 如果当前工作线程数 > 最小的线程数量
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 小于最小的线程数量,添加worker
            addWorker(null, false);
        }
    }
    

    常用的线程池配置

    jdk的Executors类提供了4个创建线程池的配置方法, 通过之前的原理,我们来分析下这些线程池的不同

    1. newFixedThreadPool

        /**
         * Creates a thread pool that reuses a fixed number of threads
         * operating off a shared unbounded queue.  At any point, at most
         * {@code nThreads} threads will be active processing tasks.
         * If additional tasks are submitted when all threads are active,
         * they will wait in the queue until a thread is available.
         * If any thread terminates due to a failure during execution
         * prior to shutdown, a new one will take its place if needed to
         * execute subsequent tasks.  The threads in the pool will exist
         * until it is explicitly {@link ExecutorService#shutdown shutdown}.
         *
         * @param nThreads the number of threads in the pool
         * @return the newly created thread pool
         * @throws IllegalArgumentException if {@code nThreads <= 0}
         */
        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    

    创建一个 通过操作一个共享的无界队列来复用固定数量的线程的线程池。

    首先看构造方法,核心线程数和最大线程数是一样的 ,说明不存在线程池扩容的情况

    空闲有效时间为0 毫秒, 由于只存在核心线程,所以不存在 线程被注销的情况

    LinkedBlockingQueue 是一个无界队列,默认大小为int的最大值,所以不会出现 队列长度不够而导致 创建空闲线程的情况,也就不会出现 拒绝策略。

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    

    总结 :

    1. 线程数固定,
    2. 没有多余线程线程回收,
    3. 不会出现因线程不够,队列装不下而拒绝任务的情况

    2.newSingleThreadExecutor

    /**
     * Creates an Executor that uses a single worker thread operating
     * off an unbounded queue. (Note however that if this single
     * thread terminates due to a failure during execution prior to
     * shutdown, a new one will take its place if needed to execute
     * subsequent tasks.)  Tasks are guaranteed to execute
     * sequentially, and no more than one task will be active at any
     * given time. Unlike the otherwise equivalent
     * {@code newFixedThreadPool(1)} the returned executor is
     * guaranteed not to be reconfigurable to use additional threads.
     *
     * @return the newly created single-threaded Executor
     */
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    

    只有一个线程,无界队列

    3.newCachedThreadPool

    /**
     * Creates a thread pool that creates new threads as needed, but
     * will reuse previously constructed threads when they are
     * available.  These pools will typically improve the performance
     * of programs that execute many short-lived asynchronous tasks.
     * Calls to {@code execute} will reuse previously constructed
     * threads if available. If no existing thread is available, a new
     * thread will be created and added to the pool. Threads that have
     * not been used for sixty seconds are terminated and removed from
     * the cache. Thus, a pool that remains idle for long enough will
     * not consume any resources. Note that pools with similar
     * properties but different details (for example, timeout parameters)
     * may be created using {@link ThreadPoolExecutor} constructors.
     *
     * @return the newly created thread pool
     */
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    

    典型的缓存策略。

    说明适用于 吞吐量优先,会优先开启尽可能多的线程来保证数据处理效率,适合高并发场。如果任务不那么繁忙了,线程空闲下来,超过一分钟线程就被注销,可灵活回收空闲线程。

    1. 如何灵活回收空闲线程?

    从构造方法可以看到, 核心线程是0,是没有常驻线程的,所有的线程都是非核心线程。并且超时时间设置为60s,超过60没有从队列中获取到任务就会被回收。

    2. 如果保证吞吐量优先?

    同样从构造方法得知,非核心线程数量 最大可以达到Integer.MAX_VALUE个,也就是整形的最大数2^31-1个, 如果并发量高,会启用大量的线程来更快的处理任务,几乎不会有任务在排队的现象。

    那么是有一个任务来就创建一个线程吗?

    答案肯定不是的, 关键点在于 SynchronousQueue这个队列,和一般的BlockingQueue不同,这个队列的offer() 只有在 有线程阻塞在poll()方法的时候才会返回true,否则返回false,从ThreadPoolExecutor的execute(Runnable r) 可以得知, 当 workQueue.offer(command)返回false时,会调用下面那个else if里的addWorker去创建线程, 由于最大线程数 是Integer.MAX_VALUE,那么是肯定 可以新创建线程的。

    image

    那么 当没有任何其他线程阻塞在队列的poll()方法时,有两种情况

    1. 要么 是第一次进来,还没有产生任何线程在作业
    2. 要么 就是其他线程在正在处理任务, 整个线程池的所有线程都处于繁忙的状态,就没有任何空闲下来的线程。

    所以,基于第2种情况,可以看出 为了吞吐量优先,就会新创建一个线程来处理 当前要提交的任务,保证任务的及时处理。

    如果有线程空闲下来,在60s之内还未被回收,那么此时提交任务,调用workQueue.offer(command)这处代码就会返回true,将任务提交至队列,让其他线程poll到 处理即可,就不会创建新的线程, 达到线程的复用,节省了线程创建,回收的资源开销。

    4.newScheduledThreadPool

        /**
         * Creates a thread pool that can schedule commands to run after a
         * given delay, or to execute periodically.
         * @param corePoolSize the number of threads to keep in the pool,
         * even if they are idle
         * @return a newly created scheduled thread pool
         * @throws IllegalArgumentException if {@code corePoolSize < 0}
         */
        public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
            return new ScheduledThreadPoolExecutor(corePoolSize);
        }
    
    

    定时任务线程池

    /**
     * Creates a thread pool that can schedule commands to run after a
     * given delay, or to execute periodically.
     * @param corePoolSize the number of threads to keep in the pool,
     * even if they are idle
     * @return a newly created scheduled thread pool
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    
    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }
    
    延迟执行
    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     */
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    }
    
    重复执行

    自定义

    schedule(){
    
            // dosomething
    // 递归
            schedule();
    
    }
    

    api

    scheduleWithFixedDelay

    执行完任务再计算延迟时间

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(-delay));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }
    
    scheduleAtFixedRate

    从任务开始执行就计算延迟时间

        /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         * @throws IllegalArgumentException   {@inheritDoc}
         */
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                      long initialDelay,
                                                      long period,
                                                      TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            if (period <= 0)
                throw new IllegalArgumentException();
            ScheduledFutureTask<Void> sft =
                new ScheduledFutureTask<Void>(command,
                                              null,
                                              triggerTime(initialDelay, unit),
                                              unit.toNanos(period));
            RunnableScheduledFuture<Void> t = decorateTask(command, sft);
            sft.outerTask = t;
            delayedExecute(t);
            return t;
        }
    
    案例 每周三22点执行
    public static void main(String[] args) {
        ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(1);
        // 计算当前时间距离目标时间还有多久
        //  初始化延迟时间 = 目标时间 - 当前时间
        // 周期  = 7天
        int period = 7;
        scheduled.scheduleAtFixedRate(() -> {
            //doSomething
        }, 初始化延迟时间, period, TimeUnit.DAYS)
    }
    

    相关文章

      网友评论

          本文标题:深入理解线程池源码

          本文链接:https://www.haomeiwen.com/subject/necrjrtx.html