美文网首页
线程池ThreadPoolExecutor

线程池ThreadPoolExecutor

作者: sunpy | 来源:发表于2018-09-17 21:33 被阅读32次

题外话

昨天下午和晚上的两场面试,都提到了线程池,第二家直接就问的源码;因为都是平时自己直接使用Executors直接创建一些缓存线程池或者指定数量的线程池玩玩,但是对于线程池的本质ThreadPoolExecutor本身没有深入理解。写这篇博客打算完善下这块知识点的不足。


风景.jpg

线程池执行流程

  1. 线程池判断核心线程是否都处于运行状态,如果不是,就创建一个新线程来执行任务。如果是,执行2。
  2. 判断线程池中的工作队列是否已经满,如果未满,那么直接添加到工作队列,如果满了就执行3。
  3. 线程池判断池中的线程是否处于工作状态。如果没有就创建一个新工作线程执行任务。如果已经满了,则执行饱和策略的任务。
    饱和策略:
    AbortPolicy:直接抛出异常。
    CallerRunsPolicy:只用调用者所在线程来运行任务。
    DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
    DiscardPolicy:不处理,丢弃掉。


    线程池-构造方法.png

例子1:当线程池已经达到了最大线程数,执行饱和策略。

public static void main(String[] args) {
        ThreadPoolExecutor tp = new ThreadPoolExecutor(10, 
                20, 
                20, 
                TimeUnit.SECONDS, 
                new ArrayBlockingQueue<Runnable>(10),
                new ThreadPoolExecutor.AbortPolicy());
        
        for (int i = 0 ; i < 20000 ; i++) {
            tp.execute(new Runnable() {
                
                public void run() {
                    System.out.println(Thread.currentThread().getName() + ":" + new Date());
                }
            });
        }
        
        tp.shutdown();
    }

结果:


1.jpg

说明:可以发现当线程池和队列都已经满了,就直接抛出RejectedExecutionException异常。

阿里规范中线程池

1.png

说明:ThreadPoolExecutor使用的重要性

ThreadPoolExecutor源码分析

源码流程图:


ThreadPoolExecutor线程池源码流程图.png

关键属性:

    // 该变量ctl作为线程池中的重要属性,属性一分为2;
    // 高位(前三位)用来保存各个线程的状态,低位(后三位)保存有效线程的数量。
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // 线程数量占用的位数
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // 最大线程容量为2的29次幂减1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 接受新任务并处理排队任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    // 不接受新任务,而是处理队列任务
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // 不接受新任务,不处理排队任务和中断进程中的任务
    private static final int STOP       =  1 << COUNT_BITS;
    // 终止所有的任务,有效的线程数量设置为0,TIDYING状态的线程执行回调方法terminated()
    private static final int TIDYING    =  2 << COUNT_BITS;
    // terminated()方法已经执行完毕
    private static final int TERMINATED =  3 << COUNT_BITS;

    // 持有工作线程队列
    private final BlockingQueue<Runnable> workQueue;

    // 工作线程所在的集合上的锁
    private final ReentrantLock mainLock = new ReentrantLock();

    // 包含所有工作线程的集合,仅在获取到锁时才可以访问
    private final HashSet<Worker> workers = new HashSet<Worker>();

    // 使用等待队列
    private final Condition termination = mainLock.newCondition();

    // 最大池的大小
    private int largestPoolSize;

    // 完成任务时的计数值,只能在终止工作线程时更新,在获取到锁时才可以访问
    private long completedTaskCount;

    // 创建新线程的工厂
    private volatile ThreadFactory threadFactory;

    // 关闭执行的线程池时调用的Handler
    private volatile RejectedExecutionHandler handler;

    // 等待工作的线程的超时时间,单位纳秒
    private volatile long keepAliveTime;

    // 是否允许核心线程等待,
    // 默认false,就是空闲也可以生存。
    // true的话,在keepAliveTime时间内生存。
    private volatile boolean allowCoreThreadTimeOut;

    // 核心运行的线程数
    private volatile int corePoolSize;

    // 最大线程数,超出这个数量将执行丢弃策略
    private volatile int maximumPoolSize;

    // 默认拒绝执行策略Handler
    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();

关键方法:

private static int runStateOf(int c)     { return c & ~CAPACITY; }

说明:作用就是获取线程池的状态;过程:~CAPACITY :按位取反的值为11100000000000000000000000000000。c & ~CAPACITY : 执行按位与操作,那么结果的低29位肯定为0。

private static int workerCountOf(int c)  { return c & CAPACITY; }

说明:获取线程池工作线程的数量;过程:CAPACITY : 00011111111111111111111111111111。&操作将参数的高3位设置0。

 private static int ctlOf(int rs, int wc) { return rs | wc; }

说明:将runState和workerCount存到同一个int中。使用或运算,将两个值合并。

    // 使用cas方式将线程的数量加1
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }

    // 使用cas方式将线程的数量减1
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }

    // 减少ctl的计数值
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }

Worker详解

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable

Worker实现了Runnable接口,封装了线程的实现和执行。并且继承了AQS队列同步器。简化了获取锁和释放锁的过程,交给AQS去实现了。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable {
        
        private static final long serialVersionUID = 6138294804551838833L;

        // 运行中的线程,由ThreadFactory工厂创建
        final Thread thread;
        // 第一个执行的任务,可能为null
        Runnable firstTask;
        // 线程计数器,记住完成的任务
        volatile long completedTasks;

        // 使用线程工厂ThreadFactory为第一个任务firstTask创建线程
        Worker(Runnable firstTask) {
            // 在执行runWorker之前,禁止中断操作
            setState(-1); // inhibit interrupts until runWorker
            // 初始化第一个执行的任务
            this.firstTask = firstTask;
            // 获取线程工厂,为当前对象创建一个线程来初始化
            this.thread = getThreadFactory().newThread(this);
        }

        // 运行线程run方法委托给外部的runWorker来执行
        public void run() {
            runWorker(this);
        }

        // 判断当前线程是否独占(使用state状态值判断,如果获取锁就加1,如果释放锁就减1)
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        // 当前线程尝试获取锁
        protected boolean tryAcquire(int unused) {
            // 使用cas的方式来设置当前线程获取锁
            if (compareAndSetState(0, 1)) {
                // 设置当前线程为独占线程
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        
        // 当前线程尝试释放锁
        protected boolean tryRelease(int unused) {
            // 将独占线程设置为null
            setExclusiveOwnerThread(null);
            // 使用cas方式更新当前同步状态值为0
            setState(0);
            return true;
        }
        
        // 独占式获取锁
        public void lock()        { acquire(1); }
        // 尝试以独占式获取锁
        public boolean tryLock()  { return tryAcquire(1); }
        // 独占式释放锁
        public void unlock()      { release(1); }
        // 判断当前线程是否独占着锁
        public boolean isLocked() { return isHeldExclusively(); }
        // 线程启动后中断
        void interruptIfStarted() {
            Thread t;
            // 标识线程中断
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

说明:里面大多实现的方法,获取锁,释放锁,判断当前线程是否中断都是AQS的cas操作同步状态值state的方式。
AbstractQueuedSynchronizer同步队列源码


详解execute方法

    public void execute(Runnable command) {
        // 如果执行的线程为null,那么就抛出空指针异常。
        if (command == null)
            throw new NullPointerException();
        
        // 采用cas的方式获取ctl
        int c = ctl.get();
        // 如果工作线程的数量小于当前核心线程数量
        if (workerCountOf(c) < corePoolSize) {
            // 添加工作线程成功,那么直接返回
            if (addWorker(command, true))
                return;
            // 重新获取ctl值
            c = ctl.get();
        }
        
        // 如果线程池可以接受任务正常工作并且元素成功插入到工作队列
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次获取ctl值,检查
            int recheck = ctl.get();
            // 如果ctl值不可以接受任务正常运行并且从工作队列移除线程节点
            if (! isRunning(recheck) && remove(command))
                // 抛出线程异常
                reject(command);
            // 如果线程在运行阶段,线程池工作线程的数量如果为0
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 如果不能入队,且不能创建Worker
        else if (!addWorker(command, false))
            // 抛出线程异常
            reject(command);
    }

说明:
① 如果工作线程的数量小于核心线程数,那么创建核心线程,将任务作为核心线程的第一个任务。
② 如果当前线程池处于可以接受线程任务正常工作的状态,并且可以将任务添加到队列中,那么然后需要在此检查是否需要新建一个线程。如果不在RUNNING状态并且能够成功移除任务的话,那么调用reject方法,否则就调用addWorker(null,false)方法。
③ 如果任务不能添加到工作队列,那么创建一个非核心线程,如果创建非核心线程失败,那么就抛出RejectedExecutionException异常。


addWorker方法:将封装线程的Worker添加到Worker集合中,然后启动线程

// 添加任务
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    // 死循环遍历
    for (;;) {
        // 获取当前的线程池属性值
        int c = ctl.get();
        // 获取线程池的状态
        int rs = runStateOf(c);

        // 如果线程池不可以接受任务
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        // 死循环遍历
        for (;;) {
            // 获取工作线程的数量
            int wc = workerCountOf(c);
            // 如果工作线程的数量超出线程池最大线程容量
            // 或者工作线程的数量超出核心线程池数量或者最大线程池的数量
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                // 返回添加失败
                return false;
            // 采用cas方法更新线程状态值,如果成功,那么worker数量加1
            if (compareAndIncrementWorkerCount(c))
                break retry;
            // 重新读取线程池属性值
            c = ctl.get();  // Re-read ctl
            // 如果线程池的状态不等于之前的线程状态,重新执行retry代码块
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 获取线程池的锁
        final ReentrantLock mainLock = this.mainLock;
        // 将firstTask封装成worker
        w = new Worker(firstTask);
        // 获取worker的工作线程
        final Thread t = w.thread;
        // 如果工作线程不为空
        if (t != null) {
            // 给线程池上锁
            mainLock.lock();
            try {
                // 获取当前的线程池的属性值
                int c = ctl.get();
                // 获取当前线程池的状态
                int rs = runStateOf(c);
                // 如果线程池的状态可以接受任务正常工作
                // 或者如果线程池不接受任务并且当前任务为空
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 判断工作线程是否可以启动
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将当前的工作线程添加到workers集合中
                    workers.add(w);
                    // 获取Worker工作线程的数量
                    int s = workers.size();
                    // 如果Worker工作线程的数量超出最大池长度
                    if (s > largestPoolSize)
                        // 更新最大池长度
                        largestPoolSize = s;
                    // 设置工作线程添加成功
                    workerAdded = true;
                }
            } finally {
                // 线程池释放锁
                mainLock.unlock();
            }
            // 如果工作线程添加成功
            if (workerAdded) {
                // 启动工作线程
                t.start();
                // 设置工作线程启动成功
                workerStarted = true;
            }
        }
    } finally {
        // 如果工作线程启动失败!
        if (! workerStarted)
            // 回滚Worker线程的创建
            addWorkerFailed(w);
    }
    // 工作线程启动成功标识
    return workerStarted;
}

说明:可以发现代码结构是两个死循环,外循环的作用主要是检查当前线程池的状态,如果线程池不可以接受任务,那么直接退出。内循环的作用主要是检查工作线程的数量。
① 如果线程池不可以接受任务,那么直接退出。
② 如果工作线程的数量大于最大线程容量或者工作线程的数量大于核心线程池数量(最大线程池的数量),那么直接退出。
③ 使用cas更新线程属性值ctl(将ctl值加1),因为ctl的低位用来存储工作线程的数量,所以就是cas方式更新工作线程加1,跳出循环。如果线程池的状态和之前的不一致,说明cas方式失败了,那么重新执行retry。
④ 真正添加Worker,首先获取线程池的锁,将当前的任务封装成Worker。判断工作线程不为空,然后给线程池上锁,如果线程可以接受任务正常工作或者线程池不接受任务并且当前任务为空,将当前的工作线程添加到Workers集合中。如果Wokers工作线程集合超出池最大长度,将池最大长度更新为Wokers集合的大小。然后释放锁。
⑤ 如果前面工作线程添加成功,那么直接启动工作线程来完成firstTask作为其第一个任务。如果启动线程失败了,那么就回滚Worker线程的创建。
addWorkerFailed方法:线程添加失败策略

    // 回滚Worker线程的创建;删除线程集合中添加的线程
    private void addWorkerFailed(Worker w) {
        // 获取线程集合上的锁
        final ReentrantLock mainLock = this.mainLock;
        // 线程集合上锁
        mainLock.lock();
        try {
            // 如果工作线程不为空
            if (w != null)
                // 工作线程集合移除工作线程
                workers.remove(w);
            // 使用cas方式更新为工作线程worker的数量减1
            decrementWorkerCount();
            // 尝试将线程池状态转移到TERMINATED
            tryTerminate();
        } finally {
            // 线程集合解锁
            mainLock.unlock();
        }
    }

说明:
获取线程集合的锁,并给线程集合上锁。如果工作线程不为空,那么从Workers集合中移除添加的Worker。使用cas方式更新为工作线程数量减1。尝试将线程池状态转移到TERMINATED。释放锁。


Worker委托的方法

runWorker方法:Worker的run线程执行的方法;就是执行线程任务

    // 执行线程任务
    final void runWorker(Worker w) {
        // 获取当前线程
        Thread wt = Thread.currentThread();
        // 获取当前任务
        Runnable task = w.firstTask;
        // 将当前任务设置为null
        w.firstTask = null;
        // 允许当前的Worker出现中断情况
        w.unlock();
        boolean completedAbruptly = true;
        try {
            // 如果当前任务不为空或者从工作队列中获取的任务也不为空
            while (task != null || (task = getTask()) != null) {
                // 上锁
                w.lock();
                /**
                 * 判断线程池停止了或者给当前线程加上中断标志
                 * 如果当前线程没有中断,那么将当前线程中断
                 */
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行线程任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    // 将对象设置为null便于回收
                    task = null;
                    // 完成任务的数量加1
                    w.completedTasks++;
                    // 释放锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 线程池关闭,线程退出
            processWorkerExit(w, completedAbruptly);
        }
    }

说明:
线程启动执行firstTask任务,如果firstTask任务为空,那么从工作队列中取出任务。执行getTask方法。
getTask方法:从等待队列获取工作线程。

    /**
     * 执行阻塞或超时等待的任务,以下情况返回null,退出:
     * 1. 如果当前工作线程数量大于线程池最大数量,就返回null
     * 2. 线程池为STOP状态:不接受新任务,不处理排队任务和中断进程中的任务
     * 3. 线程池为shutdown状态并且队列为空:不接受新任务,无法处理队列中的任务
     * 4. 工作线程Worker超时等待一个任务,超时的工作线程将终止
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            // 获取ctl值
            int c = ctl.get();
            // 获取线程池运行状态
            int rs = runStateOf(c);

            // 如果线程池的状态为SHUTDOWN,那么线程池不再接受新任务,但是会执行队列中的任务。
            // 如果线程池的状态为STOP,那么不仅线程不再接受新任务,而且队列中的任务也不会执行。
            // 如果线程池的状态等于SHUTDOWN并且队列为空,工作线程的数量减1。
            // 如果线程池的状态等于STOP,不管队列是否为空,工作线程的数量减1。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 工作线程的数量减1
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?

            for (;;) {
                // 获取工作线程的数量
                int wc = workerCountOf(c);
                // allowCoreThreadTimeOut:默认false,就是空闲也可以生存;true的话,在keepAliveTime时间内生存
                // 或者当前工作线程数大于核心线程数
                timed = allowCoreThreadTimeOut || wc > corePoolSize;
                
                // 当前线程数小于线程池最大线程数并且当前工作线程小于核心线程
                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                    
                // workerCount递减,结束当前thread
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                
                // 需要重新检查线程池状态
                if (runStateOf(c) != rs)
                    continue retry;
            }

            try {
                // 如果线程淘汰标志为true,那么工作队列将在指定时间超时内获取任务
                // 否则直接从工作队列中取出任务,如果队列为空,那么线程将一直阻塞。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                // 超时标志设置为true
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

processWorkerExit方法:线程池关闭,回收线程

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            // 工作线程数量减1
            decrementWorkerCount();
        // 获取重入锁
        final ReentrantLock mainLock = this.mainLock;
        // 上锁
        mainLock.lock();
        try {
            // 完成任务的数量增加
            completedTaskCount += w.completedTasks;
            // 工作线程集合移除工作线程
            workers.remove(w);
        } finally {
            // 释放锁
            mainLock.unlock();
        }
        
        // 终止线程池
        tryTerminate();
        
        // cas方式获取ctl值
        int c = ctl.get();
        // 如果当前线程池接受新任务且处理排队任务
        // 或者线程池虽然不接受新任务但是还处理排队任务
        if (runStateLessThan(c, STOP)) {
            
            if (!completedAbruptly) {
                // min 线程池最小空闲数
                // allowCoreThreadTimeOut允许核心线程在keepAliveTime时间等待之后是否允许生存
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果min == 0但是队列不为空要保证有1个线程来执行队列中的任务
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 如果线程池中工作线程不为空,就直接返回了
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 线程池将不处理队列中的任务并且等待队列不为空,直接返回false
            addWorker(null, false);
        }
 }

tryTerminate方法:将线程池的状态转换为终止状态

final void tryTerminate() {
        for (;;) {
            // cas方式获取ctl值
            int c = ctl.get();
            // 线程池接受新任务并处理排队任务
            // 或者线程池终止任务了
            // 或者线程池已经终止任务并且等待队列中不为空
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 如果工作线程不为空
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // 将线程集合中线程获取锁,然后将其中断,最后释放锁
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            // 获取重入锁
            final ReentrantLock mainLock = this.mainLock;
            // 上锁
            mainLock.lock();
            try {
                // 如果当前的值c为期望值ctl,那么将其值更新为ctlOf(TIDYING, 0)
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 执行停止方法,子类重写
                        terminated();
                    } finally {
                        // 将当前ctl值更新为TERMINATED状态
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 唤醒等待线程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                // 释放锁
                mainLock.unlock();
            }
        }
}

shutdown方法:将线程池的状态转换为SHUTDOWN并且终止所有线程

// 将线程池的状态转换为SHUTDOWN并且终止所有线程
    public void shutdown() {
        // 获取重入锁
        final ReentrantLock mainLock = this.mainLock;
        // 上锁
        mainLock.lock();
        try {
            // 检查线程池关闭的权限
            checkShutdownAccess();
            // 将当前线程池的状态转换到目标状态targetState
            advanceRunState(SHUTDOWN);
            // 中断等待队列中的空闲线程
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            // 释放锁
            mainLock.unlock();
        }
        // 将线程池的状态转换为终止状态
        tryTerminate();
    }

    // 将当前线程池的状态转换到目标状态targetState
    private void advanceRunState(int targetState) {
        for (;;) {
            int c = ctl.get();
            if (runStateAtLeast(c, targetState) ||
                ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
                break;
        }
    }

线程回收

问题:如果我们不设置线程池回收线程,那么线程池中的线程会怎么样。

public static void main(String[] args) {
        ThreadPoolExecutor tp = new ThreadPoolExecutor(3, 
                5, 
                1, 
                TimeUnit.SECONDS, 
                new ArrayBlockingQueue<Runnable>(10),
                new ThreadPoolExecutor.CallerRunsPolicy());
        
        for (int i = 0 ; i < 5; i++) {
            tp.execute(new Runnable() {
                
                public void run() {
                    System.out.println(Thread.currentThread().getName());
                }
            });
        }
    }
线程池不回收线程.png
说明:线程池中的线程执行完任务后,当从等待队列中获取任务时,因为任务已经执行完了,没有任务了,阻塞的工作队列已经空了,那么线程只能处于阻塞状态了(这段答案在上面getTask方法的源码)。
allowCoreThreadTimeOut方法:设置线程超时回收(包括核心线程和非核心线程)
    // 设置线程超时回收(包括核心线程和非核心线程)
    public void allowCoreThreadTimeOut(boolean value) {
        // 如果keepAliveTime设置不大于0,那么报出非法参数异常
        if (value && keepAliveTime <= 0)
            throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
        // value为true时向下执行,allowCoreThreadTimeOut默认值为false
        if (value != allowCoreThreadTimeOut) {
            // allowCoreThreadTimeOut设置为true
            allowCoreThreadTimeOut = value;
            if (value)
                // 中断等待任务的线程
                interruptIdleWorkers();
        }
    }
    
    // 中断等待任务的线程
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
    
    // 中断等待任务的线程
    private void interruptIdleWorkers(boolean onlyOne) {
        // 获取重入锁
        final ReentrantLock mainLock = this.mainLock;
        // 上锁
        mainLock.lock();
        try {
            // 遍历线程集合
            for (Worker w : workers) {
                Thread t = w.thread;
                // 允许尝试获取锁且非中断
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        // 将线程中断
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        // 释放锁
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            // 释放锁
            mainLock.unlock();
        }
    }

说明:通过设置allowCoreThreadTimeOut为true,线程池中的线程如果阻塞的时间超过了keepAliveTime,那么将被标记为中断标志。
还可以通过前面提到的关闭线程池shutdown方法:检查线程池关闭的权限、将当前线程池状态转为为SHUTDOWN、中断等待(阻塞)的线程。

总结

1. 线程池采用ThreadFactory中默认的DefaultThreadFactory实现类来创建的,使用工厂方法模式来设计的线程工厂创建线程。(而工厂方法模式对于我的理解:一个产物对应一个工厂)
2. 线程池的任务饱和策略:
AbortPolicy:直接抛出RejectedExecutionException异常。
CallerRunsPolicy:只用调用者所在线程来运行任务。
DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
DiscardPolicy:不处理,丢弃掉。

3. 队列满的策略:
① 如果当前核心线程出现空闲,那么直接使用核心线程。
② 如果没有空闲的核心线程,队空,就插入到队尾。
③ 如果没有空闲的核心线程,队满,就使用ThreadFactory创建非核心线程来执行。
④ 如果线程池中工作线程数大于了最大线程数,就执行任务饱和策略。

4. 线程回收策略:
①第一种策略:使用allowCoreThreadTimeOut方法对核心线程和非核心线程进行回收。
②第二种策略:使用shutdown方法关闭线程池来对核心线程和非核心线程进行回收。

5. 线程池什么时候启动?
答案:在new ThreadPoolExecutor类时并没有启动线程池,只是设置了参数,而线程池的启动是在执行execute方法(源码中是addWorker方法)。

相关文章

网友评论

      本文标题:线程池ThreadPoolExecutor

      本文链接:https://www.haomeiwen.com/subject/hzbtgftx.html