美文网首页
线程学习_03线程池

线程学习_03线程池

作者: 冉桓彬 | 来源:发表于2018-01-29 22:13 被阅读14次

线程池结构图:

线程池结构图.png

一、关于Executor:

1.1 线程池的顶层接口Executor:
public interface Executor {
    void execute(Runnable command);
}
1.2 线程池的第二层接口ExecutorService :
public interface ExecutorService extends Executor {

    void shutdown();

    List<Runnable> shutdownNow();

    boolean isShutdown();

    boolean isTerminated();

    <T> Future<T> submit(Callable<T> task);

    <T> Future<T> submit(Runnable task, T result);

    Future<?> submit(Runnable task);
}

  1、我们经常用到的几个线程池都是实现于ExecutorService接口, ExecutorService对Executor进行了扩展, 支持Callable和Runnable;
  2、先不考虑ExecutorService每个方法到底什么作用, 待后边分析到线程池管理线程流程时自然明了;

1.3 线程池的提供者Executors:
public class Executors {
    
    public static ExecutorService newFixedThreadPool(int nThreads) {...}

    public static ExecutorService newWorkStealingPool() {...}

    public static ExecutorService newSingleThreadExecutor() {...}

    public static ExecutorService newCachedThreadPool() {...}

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {...}

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {...}

    private Executors() {}
}

  1、先忽略这些方法的内部实现, 以及传入的参数, 现在分析还为时过早, 属于强行记忆;
  2、这些方法主要返回两大类线程池ExecutorService和ScheduledExecutorService, 而ScheduledExecutorService又是继承于ExecutorService, 所以后续会重点关注这两个类;
  3、接下来开始分析ScheduledExecutorService和ExecutorService是如何管理内部维护的线程池;
  4、对Executors 方法展开, 发现ExecutorService和ScheduledExecutorService分别指向ThreadPoolExecutor和ScheduledThreadPoolExecutor, 而这两个类又都间接实现ExecutorService接口, 所以都支持execute(RunnableImpl)和submit(CallableImpl)方式;
先分析execute(RunnableImple)这种方式;
  5、关于四个方法的具体分析在模块三

在进行分析之前, 先列出线程池中线程的几个状态以及构建线程池时所需要的参数, 后续分析源码时遇到即回头来进行补充说明:
线程的几个状态值 :

private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

构建线程池时所需参数 :

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
        ...
}
  • corePoolSize : 线程池核心线程数, 当调用execute(RunnableImpl)时会先判断当前工作线程数量是否小于核心线程数, 如果是, 则尝试新建工作线程, 如果否, 则尝试复用线程或新建线程;
  • workQueue :Worker队列, 当调用execute(RunnableImpl)时如果当前线程数量 ≥ 核心工作线程时新到的task会首先被添加至Worker队列(workQueue)中, 然后Worker从workQueue中取出task执行, worker从队列中调用task的方式由我们此时传入的BlockingQueue决定;

二、任务提交:

2.1 ThreadPoolExecutor.execute:
  • 注意Thread与Worker的关系;
public class ThreadPoolExecutor extends AbstractExecutorService {
    public void execute(Runnable command) {

        int c = ctl.get();
<<1>>
        //**
         * 1. 首先获取线程池中Worker的数量, 然后与核心线程数量进行比较, 如果小于核心线程数量, 进入<<2>>
         * 2. 如果 当前线程池中线程数量 ≥ 核心线程数, 则进入到<<3>>
         */
        if (workerCountOf(c) < corePoolSize) {
<<2>>
            //**
             * 1. 后续还会根据情况不同, 连续多次调用addWorker方法, 但是传入的参数不同, 这里需要注意;
             * 2. 这里传入的是command, true;
             * 3. 如果addWorker() = true, 由下文<<3.1>>模块可知, 此时满足以下两个条件:
             *    (1) 线程池处于获取RUNNING状态;
             *    (2) 或者firstTask = null, 而firstTask此时一定 != null; 
             *   所以此时如果线程池处于RUNNGIN状态, 则addWorker一定返回true, 然后直接执行return操作;
             */
            if (addWorker(command, true))    模块<<3.1>>
                return;
            c = ctl.get();
        }
<<3>>
        //**
         * 1. 结合<<1>>可知能执行到这里有以下几种可能:
         *    (1) workerCountOf(c) < corePoolSize, 且addWorker(command, true) = true;
         *    (2) workerCountOf(c) ≥ corePoolSize;
         * 2. 如果忽略线程池的运行状态, 则if = true/false取决于workQueue.offer(command):
         *    (1) 返回true, 则表示当前workQueue未满, 进入<<4>>;
         *    (2) 返回false, 则表示当前workQueue已满, command入队失败, 进入<<7>>;
         * 3. 到这里可以知道, 当向线程池中提交任务时, 首先会判断 workerCountOf(c)是否小于corePoolSize,
         *    如果 < corePoolSize, 则直接创建线程执行该command, 如果 ≥ corePoolSize, 则先尝试将command
         *    添加到workQueue中;
         * 4. 如果workQueue.offer = false, 进入模块<<7>>;
         * 5. 这里的command被添加到workQueue的方式取决于在构造ThreadPoolExecutor时传入的BlockingQueue
         *    的具体实现类;
         */
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
<<4>>
            //**
             * 1. 任务被添加到workQueue成功, 此时如果线程池处于关闭状态, 则从workQueue中移除command;
             * 2. 如果线程池处于运行状态, 进入<<6>>;
             */
            if (! isRunning(recheck) && remove(command))
<<5>>
                reject(command);
            else if (workerCountOf(recheck) == 0)
<<6>>
                //**
                 * 1. 如果当前线程池处于RUNNING状态, 会一路执行到这里执行addWorker(null, false)操作;
                 * 2. 注意此时传入的参数与<<2>>传入的参数的区别, null---false, 分析到模块<三>时可知, 这
                 *    里之所以传入null, 是因为command已经被添加到workQueue中了, 后续创建Thread时直接
                 *    从workQueue中读取command;
                 */
                addWorker(null, false);        模块<3.1>
        }
<<7>>
        //**
         * 1. 能执行到这里有以下几种情况:
         *    (1) workerCountOf(c) ≥ corePoolSize且workQueue已满;
         * 2. 结合模块<<3.1>>的分析可知, addWorker返回false有以下几种可能:
         *    (1) 线程池处于SHUTDOWN状态;
         *    (2) workerCount > CAPACITY(即Worker数量大于当前线程池最大线程数);
         *    (3) wc ≥ maximumPoolSize(即Worker数量大于当前线程池最大线程数);
         * 2. 如果能够返回true, 则说明一定执行了Worker w = new Worker操作;
         * 3. 如果addWorker(...)返回false, 属于异常情况, 进入<<8>>去执行command拒绝策略;
         */
        else if (!addWorker(command, false))
<<8>>
            //**
             * 1. 执行到这里有以下几种情况:
             *    (1) 线程池关闭;
             *    (2) workerCountOf(c) > maximumPoolSize;
             * 2. command拒绝策略取决于在构建ThreadPoolExecutor时传入的拒绝策略的具体实现; 
             */
            reject(command);    <<4.1>>
    }
}
  • 关于execute的流程图:
//1---> ~ //8--->的流程图

三 任务处理:

3.1 ThreadPoolExecutor.addWorker:
public class ThreadPoolExecutor extends AbstractExecutorService {
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
<<1>>
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
<<2>>
            //**
             * 1. 针对 curThread < corePoolSize 这种情况, 如果能进入if内部, 只有rs ≥ SHUTDOWN即当前线程池
             *    处于中断情况, 否则跳过该if语句;
             * 2. 针对curThread ≥ corePoolSize这种情况, 结合模块<<2.1>>
             */
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;
<<3>>
            for (;;) {
                //**
                 * 获取当前线程池中线程的数量;
                 */
                int wc = workerCountOf(c);
<<4>>
                //**
                 * 此时先只考虑wc < CAPACITY的情况;
                 * 1. 结合模块<<2.1>>可知, 只有当 curThread < corePoolSize时, core传入true, 其他情况均
                 *    传入false, 此时 wc < corePoolSize, 跳过if语句;
                 * 2. 结合模块<<2.1>>可知, 如果core = false, 有以下两种情况:
                 *    (1) workerCountOf(c) ≥ corePoolSize, 且workQueue未满, command入队workQueue成功, 
                 *        此时firstTask = null;
                 *    (2) workerCountOf(c) ≥ corePoolSize, 且workQueue已满, command入队失败, 
                 *        此时firstTask = command;
                 */
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
<<5>>
                //**
                 * 结合<<4>>可知, 如果能够执行到这里, 说明两种情况:
                 *  (1) core = true, 且 wc < corePoolSize;
                 *  (2) core = false, 且 wc < maximumPoolSize;
                 */
                if (compareAndIncrementWorkerCount(c))
                    break retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
<<6>>
           //**
            * 1. 结合模块<<2.1>>可知, firstTask有两种值:
            *    (1) command:
            *        (1.1) workerCountOf(c) < corePoolSize, 此时core = true;
            *        (1.2) workerCountOf(c) ≥ corePoolSize, workQueue已满, command入队失败, 此时core = false;
            *    (2) null: 
            *        (2.1) workerCountOf(c) ≥ corePoolSize且workQueue未满, command入队成功, 此时core = false;
            */
            w = new Worker(firstTask);   模块<<3.2>>
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
<<7>>
                    //**
                     * 如果线程池处于RUNNING状态, 进入if内部, 主要操作将workerAdded置为true;
                     */
                    if (rs < SHUTDOWN ||  (rs == SHUTDOWN && firstTask == null)) {
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
<<8>>
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
<<8>>
                    //**
                     * t内部持有Worker的引用, 所以t.start实际会触发Worker.run方法的执行;
                     */
                    t.start();      模块<<5.1>>
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
}
3.2 Worker结构:
//**
  * 1. Worker这里实现了AQS锁, 理所当然具有AQS锁的特性;
  */
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

    //** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    //** Initial task to run.  Possibly null. */
    Runnable firstTask;
    //** Per-thread task counter */
    volatile long completedTasks;
    //**
     * 1. 结合模块**2.1、2.2**可知, 在构建Worker时, 传入的firstTask可能为null:
     *    (1) firstTask = null, 此时corePoolSize < workerCount < maximumPoolSize, command存在于WorkQueue中;
     *    (2) firstTask != null, 此时 corePoolSize > workerCount; 或者corePoolSize < workerCount
     *        且WorkerQueue已满;
     * 2. Worker持有Thread引用, Thread持有Worker的引用, 在**2.2**模块worker.thread.start会触发worker.run
     *    方法的执行;
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }
}
  • 关于模块2.2 ~ 2.3的流程图:
关于模块**2.2 ~ 2.3**的流程图

四、任务拒绝:

4.1 ThreadPoolExecutor.reject:
public class ThreadPoolExecutor {
    final void reject(Runnable command) {
        //**
         * 1. 任务的拒绝完全取决于在构造ThreadPoolExecutor时传入的RejectedExecutionHandler;
         */
        handler.rejectedExecution(command, this);     模块<4.2>
    }
}
4.2 RejectedExecutionHandler具体实现类:
public static class DiscardOldestPolicy implements RejectedExecutionHandler {...}
public static class AbortPolicy implements RejectedExecutionHandler {...}
public static class CallerRunsPolicy implements RejectedExecutionHandler {...}
public static class DiscardPolicy implements RejectedExecutionHandler {...}

五、任务执行:

5.1 Worker.run:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    public void run() {
        runWorker(this);
    }

public class ThreadPoolExecutor extends AbstractExecutorService {
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //**
         * 这里也算是一个思想, 缓存复用, 将Worker.firstTask赋值给一个变量, 然后将Worker.firstWork置为null;
         */
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
<<1>>
            //**
             * 1. 结合模块<<三>>和模块<<四>>可知, task两种取值的情况如下:
             *    (1) task = null:
             *       (1.1) workerCountOf(c) ≥ corePoolSize, 且workQueue未满;
             *    (2) task = command:
             *       (2.1) workerCountOf(c) < corePoolSize;
             *       (2.2) workerCountOf(c) ≥ corePoolSize, 且workQueue已满;
             * 2. 如果task != null, 直接执行该任务;
             * 3. 如果task == null, 则说明command被添加至workQueue中, 进入模块<5.2>, 这里的
             *    getTask与模块<<2.1>>的workQueue.offer使用的是生产者--消费者模式;
             */
            while (task != null || (task = getTask()) != null) {
                w.lock();
<<2>>
                task.run();
                task = null;
                w.completedTasks++;
                w.unlock();
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
}
5.2 ThreadPoolExecutor.getTask:
public class ThreadPoolExecutor extends AbstractExecutorService {
<<1>>
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);
<<2>>
            //**
             * 1. allowCoreThreadTimeOut表示是否允许核心线程超时, 默认为false, 表示不允许核心线程超时, 
             *    通过allowCoreThreadTimeOut显示控制该变量的值, 结合下面代码说明该变量的作用;
             * 2. timed取值有以下两种情况:
             *   (1) timed = true: 
             *      (1.1) allowCoreThreadTimeOut = true, 即允许核心线程超时;
             *      (1.2) allowCoreThreadTimeOut = false(不允许核心线程超时),  且wc > corePoolSize;
             *   (2) timed = false: 
             *      (2.1)allowCoreThreadTimeOut = false(不允许核心线程超时), wc ≤ corePoolSize;
             * 3. 获取timed值之后, 切入到<<5>>
             * 4. 结合对<<5>>的分析知道:
             *    (1) 当allowCoreThreadTimeOut = true, 此时如果command队列为空, 则所有线程(包括核心与普通线程)
             *        在等待keepAliveTime时间后会销毁;
             *    (2) allowCoreThreadTimeOut = false,  wc > corePoolSize, 线程在挂起keepAliveTime时会被销毁, 
             *    (3) allowCoreThreadTimeOut = false,  wc ≤ corePoolSize, 线程会一直处于挂起状态, 直到外部唤醒;
             */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
            try {
<<6>>
                //**
                 * 1. 如果timed = true, 执行r = workQueue.poll操作;
                 * 2. 如果timed = false, 执行r = workQueue.take操作;
                 * 3. 结合BlockingQueue代码分析可知, poll(...)时, 如果元素队列为empty时, 当前线程挂起
                 *    keepAliveTime如果还未获取到元素, 则线程被销毁;
                 * 4. 如果是take操作, 则当元素队列为empty时, 当前线程会进入线程等待队列被挂起, 直到被唤醒,
                 *    关于线程池, 线程被唤醒的时机在模块<<2.1>>的workQueue.offer处;
                 * 5. 关于poll与take时, 元素具体的出入栈方式则取决于在构造ThreadPoolExecutor时传入的
                 *    BlockingQueue的具体实现;
                 */
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
}

四、Executors四个常见线程池:

4.1 Executors.newFixedThreadPool:
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
4.2 Executors.newSingleThreadExecutor:
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
}
4.3 Executors.newCachedThreadPool:
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}
4.4 Executors.newScheduledThreadPool:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

public class ScheduledThreadPoolExecutor  extends ThreadPoolExecutor implements ScheduledExecutorService {
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue());
    }
}
public class ThreadPoolExecutor extends AbstractExecutorService {
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
}
  • 结合上面四个工具类的源码分析, 具体参数对比:
方法 corePoolSize maximumPoolSize keepAliveTime BlockingQueue
newFixedThreadPool nThreads nThreads 0 LinkedBlockingQueue
newSingleThreadExecutor 1 1 0 LinkedBlockingQueue
newCachedThreadPool 0 Integer.MAX_VALUE 60 SynchronousQueue
newScheduledThreadPool corePoolSize Integer.MAX_VALUE 0 DelayedWorkQueue
  • 1、针对newCachedThreadPool()把maximumPoolSize设置成Integer.MAX_VALUE, 来一个command, 创建一个Worker, command执行完以后, keepAliveTime时间之后Worker销毁;

相关文章

  • 线程学习_03线程池

    线程池结构图: 一、关于Executor: 1.1 线程池的顶层接口Executor: 1.2 线程池的第二层接口...

  • [第三篇]深入学习线程池之优雅的关闭线程池

    通过 《深入学习线程池之线程池简介及工作原理》、《深入学习线程池之通过ThreadPoolExecutor创建线程...

  • 线程池详解二:线程池的七大参数及运行流程

    在线程池详解一:线程池概念以及架构[https://www.jianshu.com/p/c03f21033153]...

  • Java线程池详解(一)

    作者: 一字马胡 转载标志 【2017-11-03】 更新日志 一、线程池初探 所谓线程池,就是将多个线程放在一...

  • Java学习Day14

    今日学习内容总结 线程池 Lambda表达式 线程池 线程池: 其实就是一个容纳多个线程的容器,其中的线程可以反复...

  • java线程池

    线程VS线程池 普通线程使用 创建线程池 执行任务 执行完毕,释放线程对象 线程池 创建线程池 拿线程池线程去执行...

  • Java调度线程池ScheduleExecutorService

    作者: 一字马胡 转载标志 【2017-11-03】 更新日志 链接 Java线程池详解(一)Java线程池详解...

  • Java线程池的使用

    线程类型: 固定线程 cached线程 定时线程 固定线程池使用 cache线程池使用 定时调度线程池使用

  • 线程池

    1.为什么要使用线程池?2.线程池的基本原理是什么?3.怎么学习线程池?线程池使用了池化技术。池化技术用于我们常见...

  • Java中的ThreadPoolExecutor线程池

    今天简单来和大家一起学习一下,java中的ThreadPoolExecutor线程池。 线程池简介 背书中,线程池...

网友评论

      本文标题:线程学习_03线程池

      本文链接:https://www.haomeiwen.com/subject/wmltzxtx.html