美文网首页Java技术升华面试精选
从源码层面,深入剖析线程池工作原理

从源码层面,深入剖析线程池工作原理

作者: 天还下着毛毛雨 | 来源:发表于2021-09-10 18:11 被阅读0次

[TOC]

核心属性

  1. corePoolSize :核心线程数,一般情况下,该数量的核心线程创建好之后,会常驻在线程池中,不会应空闲而关闭,可以设置allowCoreThreadTimeOut=true使核心线程空闲关闭
  2. maximumPoolSize :最大线程数,>核心线程数。
  3. keepAliveTime : 空闲时间,当线程获取任务时,超过keepAliveTime仍然获取不到任务,那么线程执行完所有逻辑后,自动消亡,workerSet也会移除该worker对象
  4. unit : 空闲时间keepAliveTime 的单位
  5. BlockingQueue<Runnable> workQueue : 任务的阻塞队列,当前提交任务时,工作线程数已经>= 核心线程数, 则会将任务 推入阻塞队列中。如果阻塞队列达到最大长度,则会在工作线程数 不超过最大线程数maximumPoolSize的情况下,继续创建非核心线程来处理任务。
  6. ThreadFactory threadFactory: 创建线程的工厂
  7. RejectedExecutionHandler handler :任务的拒绝策略。当线程数任务阻塞队列满了,且工作线程数 大于等于 最大线程数了, 则 线程池无法调度线程则处理任务,调用构造方法传入的RejectedExecutionHandler实例的rejectedExecution()方法来拒绝任务。
/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

源码解析

一 、任务的执行以及线程的创建 : execute(Runnable task):

传入Runnable 对象作为要执行的任务。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    // -100000000000000000000000000000 | 0 =  -100000000000000000000000000000
    int c = ctl.get();
    // 第一种情况 :计算当前工作线程数是否小于 所配置的核心线程数
    //  workerCountOf(c)  : 11111111111111111111111111111 & -100000000000000000000000000000
    if (workerCountOf(c) < corePoolSize) {
        // 则创建新增核心线程,并执行task
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 第二种情况 :当前工作线程数已经 大于等于核心线程数了,尝试往阻塞队列中添加task
    // 如果 阻塞队列还没有满,则是添加成功的
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 第三种情况 :如果满了,阻塞队列添加task失败,就会尝试创建非核心线程
    // 会判断 当前工作线程数是否 < 最大线程数 maximumPoolSize, 如果小于就可以创建非核心线程
    else if (!addWorker(command, false))
        // 第四 : 种情况如果 >= maximumPoolSize,执行拒绝策略
        reject(command);
}
1. 当前工作线程小于核心线程数,则创建Worker对象,加入到workerSet中
image
addWorker(command, true)

创建Worker对象(持有线程),并调用持有线程的start方法,在run方法中执行runnable

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取当前工作线程数
            int wc = workerCountOf(c);
            //  如果当前工作线程数 大于最大线程数 2 ^ 29 -1 ,或者大于 (根据当前添加工作线程的类型) 核心线程数 还是 线程池最大线程数
            // 核心线程 判断是否 > corePoolSize,非核心线程,判断 maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // cas,工作线程数+1,退出循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 下面走创建线程的逻辑
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 线程一个worker, Worker 是Thread的子类,传入runnable对象
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 不是关闭状态
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 加到工作线程集合中
                    workers.add(w);
                    // 当前工作线程集合大小
                    int s = workers.size();
                    // 更新 线程池 线程数量
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 执行task
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
Worker.Run方法
private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    // 执行任务
    public void run() {
        runWorker(this);
    }
}

在run方法中调用runWorker方法,传入当前对象

  1. 该worker对象第一次执行任务时,w.firstTask是!= null的,所以可以进入while的循环体, 执行Runnable的run方法
  2. 第二次进来则从阻塞队列中拿任务。
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // task第一次被创建时,构造方法传入了Runnable对象,所以现在是!= null的
    Runnable task = w.firstTask;
    // 之后清空,第二次进来是null
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 无限循环,当前有任务未执行 或者 阻塞队列中有任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行runnable的run方法执行业务逻辑
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 允许核心线程超时关闭 或者 当前工作线程数 > 核心线程数
        // 线程关闭前,从worderSet中移除worker对象
        processWorkerExit(w, completedAbruptly);
    }
}
2. 当前工作线程数已达到核心线程数了,但是阻塞队列还没满

则会往workQueue 存入Runnable对象

如果 队列长度还没达到上限,则offer方法会成功存入Runnable对象,返回true

如果 队列长度已达到上限,则返回false,说明当前工作线程从队列中拿task,处理task的速度还不够,会创建非工作线程。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    // -100000000000000000000000000000 | 0 =  -100000000000000000000000000000
    int c = ctl.get();
    // 第一种情况 :计算当前工作线程数是否小于 所配置的核心线程数
    //  workerCountOf(c)  : 11111111111111111111111111111 & -100000000000000000000000000000
    if (workerCountOf(c) < corePoolSize) {
        // 则创建新增核心线程,并执行task
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 第二种情况 :当前工作线程数已经 大于等于核心线程数了,尝试往阻塞队列中添加task
    // 如果 阻塞队列还没有满,则是添加成功的
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 第三种情况 :如果满了,阻塞队列添加task失败,就会尝试创建非核心线程
    // 会判断 当前工作线程数是否 < 最大线程数 maximumPoolSize, 如果小于就可以创建非核心线程
    else if (!addWorker(command, false))
        // 如果 >= maximumPoolSize,执行拒绝策略
        reject(command);
}
3. 阻塞队列已满,添加task失败,就会尝试创建非核心线程

创建非核心线程的方法和创建核心线程的方法都是addWorker(runnable,boolean core),只不过传入的core参数是false,表示是非核心线程

如果是非核心线程创建,则会判断当前工作线程数是否 > 最大线程数maximumPoolSize, 如果是创建核心线程则判断的是 核心线程数

如果大于 最大线程数maximumPoolSize 就会创建线程失败

 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取当前工作线程,
            int wc = workerCountOf(c);
            // 如果当前工作线程数 大于最大线程数 2 ^ 29 -1 ,或者大于 (根据当前添加工作线程的类型) 核心线程数 还是 线程池最大线程数
            // 核心线程 判断是否 > corePoolSize,非核心线程,判断 maximumPoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    // 后面创建线程Worker对象和创建核心线程是一模一样的
 }
4. 任务拒绝:阻塞队列满了,并且 工作线程数已经达到最大线程数了, 则尝试创建非核心线程会失败,走任务的拒绝策略。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    // -100000000000000000000000000000 | 0 =  -100000000000000000000000000000
    int c = ctl.get();
    // 第一种情况 :计算当前工作线程数是否小于 所配置的核心线程数
    //  workerCountOf(c)  : 11111111111111111111111111111 & -100000000000000000000000000000
    if (workerCountOf(c) < corePoolSize) {
        // 则创建新增核心线程,并执行task
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 第二种情况 :当前工作线程数已经 大于等于核心线程数了,尝试往阻塞队列中添加task
    // 如果 阻塞队列还没有满,则是添加成功的
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 第三种情况 :如果满了,阻塞队列添加task失败,就会尝试创建非核心线程
    // 会判断 当前工作线程数是否 < 最大线程数 maximumPoolSize, 如果小于就可以创建非核心线程
    else if (!addWorker(command, false))
        // 如果 >= maximumPoolSize,执行拒绝策略
        reject(command);
}

jdk提供的拒绝策略类 :

image
  1. AbortPolicy 抛异常

    /**
     * A handler for rejected tasks that throws a
     * {@code RejectedExecutionException}.
     */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /**
         * Creates an {@code AbortPolicy}.
         */
        public AbortPolicy() { }
    
        /**
         * Always throws RejectedExecutionException.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         * @throws RejectedExecutionException always
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }
    
  2. DiscardPolicy 丢弃 = 啥也不干

    /**
     * A handler for rejected tasks that silently discards the
     * rejected task.
     */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /**
         * Creates a {@code DiscardPolicy}.
         */
        public DiscardPolicy() { }
    
        /**
         * Does nothing, which has the effect of discarding task r.
         *
         * @param r the runnable task requested to be executed
         * @param e the executor attempting to execute this task
         */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }
    
  3. DiscardOldestPolicy 推出并忽略阻塞队列中的第一个任务,尝试执行当前任务

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code DiscardOldestPolicy} for the given executor.
     */
    public DiscardOldestPolicy() { }

    /**
     * Obtains and ignores the next task that the executor
     * would otherwise execute, if one is immediately available,
     * and then retries execution of task r, unless the executor
     * is shut down, in which case task r is instead discarded.
     *
     * @param r the runnable task requested to be executed
     * @param e the executor attempting to execute this task
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
         // poll出阻塞队列中的第一个任务,并忽略掉
            e.getQueue().poll();
            // 以执行当前任务
            e.execute(r);
        }
    }
}

................................................

二 、线程的维护

线程池中的作用维护线程,避免频繁创建,销毁线程而带来系统资源的浪费。

核心线程默认(可以配置allowCoreThreadTimeOut = true 来设置 注销核心线程 )是不会在执行完某一个任务后被注销的

非核心线程 在空闲时间达到keepAliveTime 后, 会自动注销(执行完run方法)。

1. 线程的阻塞 :

Worker.runWorker(Worker w)

线程的执行方法中,用while的方式,判断 当前是否有任务(第一次被创建出来) 或者 从阻塞队列中拿任务

  1. 判断 当前是否有任务(第一次被创建出来)
  2. 阻塞队列中有任务 :execute任务时,当工作线程数 > 大于核心线程数时且 阻塞队列没有满时, 会把任务存入阻塞队列
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // task第一次被创建时,构造方法传入了Runnable对象,所以现在是!= null的
    Runnable task = w.firstTask;
    // 之后清空,第二次进来是null
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 无限循环,当前有任务未执行 或者 阻塞队列中有任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行runnable的run方法执行业务逻辑
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 允许核心线程超时关闭 或者 当前工作线程数 > 核心线程数
        // 线程关闭前,从worderSet中移除worker对象
        processWorkerExit(w, completedAbruptly);
    }
}

如果可以不停的获取任务,处理任务,这种情况下 所有线程都不会被注销,因为无法退出while循环

2. 线程的注销

但是当没有任务提交时,也就是当前任务没有,阻塞队列里也拿不到任务,线程则处于空闲状态,非核心线程 空闲状态下的时间达到keepAliveTime ,则会退出while循环,结束线程。

而核心线程则会在getTask中(如果没配置allowCoreThreadTimeOut=true) 阻塞住, 不返回结果,直到阻塞队列中可以获取到任务, 再进入while循环体。

getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling? 
        // 是否允许核心线程超时关闭 或者 当 工作线程数 > 核心线程数了
        // 当 线程中 只剩下核心线程的时候, wc > corePoolSize 就不会返回true,则会workQueue.take()阻塞住
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 如果允许核心线程超时注销 或者 当前工作线程数 > 核心线程数, 则调阻塞队列的 poll,超时返回null
            // 否则调take()方法,一直拿不到就一直阻塞
            
            // 这就说明,只有允许核心线程超时注销,或者 当 当前工作线程数 > 核心线程数时,才会调 阻塞队列会超时的poll方法,
            // runWorker方法才会退出while循环体, 结束线程
            
            // 如果allowCoreThreadTimeOut被设置为true,则所有线程从队列中拿任务调用的都是workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)方法,所有线程在poll超时之后,仍然没获取到任务,则会返回 null ,退出循环体, 结束线程
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

从workers移除线程

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        completedTaskCount += w.completedTasks;
        // 从workers移除线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    tryTerminate();

    int c = ctl.get();
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            // 如果允许核心线程超时关闭,则为0,否则为corePoolSize
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            // 如果当前工作线程数 > 最小的线程数量
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        // 小于最小的线程数量,添加worker
        addWorker(null, false);
    }
}

三、常用的线程池

jdk的Executors类提供了4个创建线程池的配置方法, 通过之前的原理,我们来分析下这些线程池的不同

1. newFixedThreadPool
    /**
     * Creates a thread pool that reuses a fixed number of threads
     * operating off a shared unbounded queue.  At any point, at most
     * {@code nThreads} threads will be active processing tasks.
     * If additional tasks are submitted when all threads are active,
     * they will wait in the queue until a thread is available.
     * If any thread terminates due to a failure during execution
     * prior to shutdown, a new one will take its place if needed to
     * execute subsequent tasks.  The threads in the pool will exist
     * until it is explicitly {@link ExecutorService#shutdown shutdown}.
     *
     * @param nThreads the number of threads in the pool
     * @return the newly created thread pool
     * @throws IllegalArgumentException if {@code nThreads <= 0}
     */
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

创建一个 通过操作一个共享的无界队列来复用固定数量的线程的线程池。

首先看构造方法,核心线程数和最大线程数是一样的 ,说明不存在线程池扩容的情况

空闲有效时间为0 毫秒, 由于只存在核心线程,所以不存在 线程被注销的情况

LinkedBlockingQueue 是一个无界队列,默认大小为int的最大值,所以不会出现 队列长度不够而导致 创建非核心线程的情况,也就不会出现 拒绝策略。

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

总结 :

  1. 线程数固定,
  2. 没有多余线程线程回收,
  3. 不会出现因线程不够,队列装不下而拒绝任务的情况
2.newSingleThreadExecutor
/**
 * Creates an Executor that uses a single worker thread operating
 * off an unbounded queue. (Note however that if this single
 * thread terminates due to a failure during execution prior to
 * shutdown, a new one will take its place if needed to execute
 * subsequent tasks.)  Tasks are guaranteed to execute
 * sequentially, and no more than one task will be active at any
 * given time. Unlike the otherwise equivalent
 * {@code newFixedThreadPool(1)} the returned executor is
 * guaranteed not to be reconfigurable to use additional threads.
 *
 * @return the newly created single-threaded Executor
 */
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

只有一个线程,无界队列

3.newCachedThreadPool
/**
 * Creates a thread pool that creates new threads as needed, but
 * will reuse previously constructed threads when they are
 * available.  These pools will typically improve the performance
 * of programs that execute many short-lived asynchronous tasks.
 * Calls to {@code execute} will reuse previously constructed
 * threads if available. If no existing thread is available, a new
 * thread will be created and added to the pool. Threads that have
 * not been used for sixty seconds are terminated and removed from
 * the cache. Thus, a pool that remains idle for long enough will
 * not consume any resources. Note that pools with similar
 * properties but different details (for example, timeout parameters)
 * may be created using {@link ThreadPoolExecutor} constructors.
 *
 * @return the newly created thread pool
 */
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

最大线程数很大,没有核心线程,但是空闲时间 较长有1分钟, 说明适用于 任务周期较短, 很多线程都可以快速处理完任务,并被复用,超过一分钟线程就被注销

4.newSingleThreadScheduledExecutor
/**
 * Creates a single-threaded executor that can schedule commands
 * to run after a given delay, or to execute periodically.
 * (Note however that if this single
 * thread terminates due to a failure during execution prior to
 * shutdown, a new one will take its place if needed to execute
 * subsequent tasks.)  Tasks are guaranteed to execute
 * sequentially, and no more than one task will be active at any
 * given time. Unlike the otherwise equivalent
 * {@code newScheduledThreadPool(1)} the returned executor is
 * guaranteed not to be reconfigurable to use additional threads.
 * @return the newly created scheduled executor
 */
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
    return new DelegatedScheduledExecutorService
        (new ScheduledThreadPoolExecutor(1));
}

单线程,定时的线程池

相关文章

网友评论

    本文标题:从源码层面,深入剖析线程池工作原理

    本文链接:https://www.haomeiwen.com/subject/rnjnwltx.html