美文网首页
线程池ThreadPoolExecutor源码解析

线程池ThreadPoolExecutor源码解析

作者: codingBoyJack | 来源:发表于2019-02-16 14:03 被阅读0次

    内部状态

    线程有五种状态:新建,就绪,运行,阻塞,死亡,线程池同样有五种状态:Running, SHUTDOWN, STOP, TIDYING, TERMINATED。

    变量ctl定义为AtomicInteger ,其功能非常强大,记录了“线程池中的任务数量”和“线程池的状态”两个信息。共32位,其中高3位表示"线程池状态",低29位表示"线程池中的任务数量"。

    RUNNING            -- 对应的高3位值是111。
    SHUTDOWN       -- 对应的高3位值是000。
    STOP                   -- 对应的高3位值是001。
    TIDYING              -- 对应的高3位值是010。
    TERMINATED     -- 对应的高3位值是011。
    
    image.png

    其中:
    RUNNING:表示正在运行中的状态,能够接受新的任务,处理现有的任务
    SHUTDOWN: 不再接收新的任务,但是可以继续处理现有的任务
    STOP:不在接受新任务,现有的任务也会被中断
    TIDYING:线程池中没有正在进行的任务,此时会回掉钩子函数terminated(),可以通过重写这个函数在线程池TIDYING状态时做一些自定义操作,比如记录日志
    TERMINATED:线程池终止

    线程池的创建

    ThreadPoolExecutor构造函数如下,一共七个参数

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    

    corePoolSize

    核心线程池大小,每提交一个新任务线程池会创建一个新线程处理任务,直到等于核心线程池大小。如果调用线程池的prestartAllCoreThreads()方法,线程池会提前创建好所有核心线程数量。

    maxPoolSize

    线程池中允许的最大线程数,当核心线程池和阻塞队列都满了之后,如果还有新任务提交,则会创建新的线程来执行任务,前提是线程数小于该参数指定的大小。如果使用无界队列的话,该参数就没什么作用。

    keepAliveTime

    线程在被回收前存活的时间,以便在被回收前可以直接用以执行新提交的任务。默认情况下,该参数仅在线程数大于corePoolSize时才有效。

    TimeUnit

    keepAliveTime的时间单位

    workQueue

    用来储存等待任务的阻塞队列

    • ArrayBlockingQueue 基于数组的有界队列,FIFO
    • LinkedBlockingQueue 基于链表的的有界队列,FIFO
    • SynchronousQueue 不储存元素的阻塞队列,每次入队列都伴随着一个出队列,反之亦然
    • PriorityBlockingQueue 具有优先级别的阻塞队列

    threadFactory

    创建线程的工厂,可以通过Executors.defaultThreadFactory()创建,如下:

    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }
    

    这边应用的是工厂设计模式,DefaultThreadFactory实现如下

     private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
    
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    

    可以看到默认情况下,线程池中线程的name都是pool-xx-thread-xx,符合我们平时使用看到的结果。并且线程池创建的线程都是非守护线程,且优先级都是NORM_PRIORITY。

    handler

    线程池拒绝策略,当线程数达到maxPoolSize后再提交新的任务,线程池会执行一种拒绝策略

    1. AbortPolicy 默认策略,直接抛异常
    2. CallerRunsPolicy 用调用者线程执行这个任务
    3. DiscardOldestPolicy 丢弃阻塞队列靠前的任务,并执行这个任务
    4. DiscardPolicy 直接丢弃任务

    Executor创建线程池

    前面我们介绍了线程池的直接构建方式,JDK也提供了利用Executors工具类创建3种线程池的工具方法,具体如下:

    • FixedThreadPool
      创建固定数量的线程池,其定义如下:
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    

    corePoolSize和maxPoolSIze都设置为nThreads,意味着如果线程池满且阻塞队列也满的情况下,如果继续提交任务,则会直接执行拒绝策略,不会创建新的线程直至数量达到maxPoolSize。
    但是这里使用的是个无界队列,意味着maxPoolSize这个参数是无效的,如果任务提交速度大于任务执行速度的话,KeepAliveTime这个参数也无效。另外,这种情况因为会无限往阻塞队列添加任务,会有OOM风险。

    • SingleThreadExecutor
      单个线程的线程池,其定义如下:
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    
    

    情况和FixedThreadPool一样,只不过是将corePoolSize和maxPoolSize都设置为1

    • CachedThreadPool
      CachedThreadPool是一个会根据需要创建新线程的线程池 ,他定义如下:
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    

    核心线程池大小为0,意味着新任务会直接进入阻塞队列,keepAliveTime这是为60L,unit设置为TimeUnit.SECONDS,意味着空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。阻塞队列采用的SynchronousQueue,SynchronousQueue是一个没有元素的阻塞队列,加上corePool = 0 ,maximumPoolSize = Integer.MAX_VALUE,这样就会存在一个问题,如果主线程提交任务的速度远远大于CachedThreadPool的处理速度,则CachedThreadPool会不断地创建新线程来执行任务,这样有可能会导致系统耗尽CPU和内存资源,所以在使用该线程池是,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题

    任务提交

    线程池可以接受两种任务提交方式:Executor.execute()、ExecutorService.submit()。其中ExecutorService.submit()可以获取该任务执行的Future。 我们以Executor.execute()为例,来看看线程池的任务提交经历了那些过程。
    定义:

    public interface Executor {
    
        void execute(Runnable command);
    }
    

    execute(Runnable command) 实现:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
    

    流程如下:

    1. 如果当前线程数小于核心线程数,调用addWorker添加任务,如果失败执行步骤2
    2. 检查线程池状态,如果是RUNNING状态,则尝试加入阻塞队列,如果加入成功,会进行一次doublecheck,如果线程池不是RUNNING状态,将任务移除队列,执行拒绝策略。如果加入失败,执行步骤3
    3. 如果线程池不是RUNNING状态或者加入阻塞队列失败,则尝试创建新线程直到maxPoolSize,如果失败,则调用reject()方法运行相应的拒绝策略。

    addWorker(Runnable firstTask, boolean core)方法用于创建线程执行任务,源码如下:

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
    
            // 获取当前线程状态
            int rs = runStateOf(c);
    
    
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                return false;
    
            // 内层循环,worker + 1
            for (;;) {
                // 线程数量
                int wc = workerCountOf(c);
                // 如果当前线程数大于线程最大上限CAPACITY  return false
                // 若core == true,则与corePoolSize 比较,否则与maximumPoolSize ,大于 return false
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // worker + 1,成功跳出retry循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
    
                // CAS add worker 失败,再次读取ctl
                c = ctl.get();
    
                // 如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
    
            // 新建线程:Worker
            w = new Worker(firstTask);
            // 当前线程
            final Thread t = w.thread;
            if (t != null) {
                // 获取主锁:mainLock
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
    
                    // 线程状态
                    int rs = runStateOf(ctl.get());
    
                    // rs < SHUTDOWN ==> 线程处于RUNNING状态
                    // 或者线程处于SHUTDOWN状态,且firstTask == null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)
                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
    
                        // 当前线程已经启动,抛出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
    
                        // workers是一个HashSet<Worker>
                        workers.add(w);
    
                        // 设置最大的池大小largestPoolSize,workerAdded设置为true
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 释放锁
                    mainLock.unlock();
                }
                // 启动线程
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
    
            // 线程启动失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    

    这个方法代码有点多,我们一点点看:

    1. 首先判断当前线程是否可以添加任务,如果可以就进行下一步,否则直接返回false, 这个if主要校验一下几种情况:
    • rs >= SHUTDOWN ,表示当前线程处于SHUTDOWN ,STOP、TIDYING、TERMINATED状态
    • rs == SHUTDOWN , firstTask != null时不允许添加线程,因为线程处于SHUTDOWN 状态,不允许添加任务
    • rs == SHUTDOWN , firstTask == null,但workQueue.isEmpty() == true,不允许添加线程,因为firstTask == null是为了添加一个没有任务的线程然后再从workQueue中获取任务的,如果workQueue == null,则说明添加的任务没有任何意义。
    1. 内嵌循环,cas自旋 worker+1
    2. 获取主锁mailLock,如果线程池处于RUNNING状态获取处于SHUTDOWN状态且 firstTask == null,则将任务添加到workers Queue中,然后释放主锁mainLock,然后启动线程,然后return true,如果中途失败导致workerStarted= false,则调用addWorkerFailed()方法进行处理。

    说清楚了这部分,现在我们回到execute()方法中,有三处调用了addWorker方法。 第一次:workerCountOf(c) < corePoolSize ==> addWorker(command, true),这个很好理解,当然线程池的线程数量小于 corePoolSize ,则新建线程执行任务即可。第二次Double check时(workerCountOf(recheck) == 0) ==>addWorker(null, false)。如果线程池中数量为0,理应新建work处理,但是因为任务已经在队列中,所以添加一个任务为null的线程,然后直接存workQueue获取任务。第三次else if (!addWorker(command, false)),当线程池不是RUNNING状态,或者阻塞队列加入失败(比如满了),继续添加worker,这里core == false,则意味着是与maximumPoolSize比较。

    新建的线程都会被包装成一个worker,worker实现如下:

    private final class Worker extends AbstractQueuedSynchronizer
            implements Runnable {
        private static final long serialVersionUID = 6138294804551838833L;
    
        // task 的thread
        final Thread thread;
    
        // 运行的任务task
        Runnable firstTask;
    
        volatile long completedTasks;
    
        Worker(Runnable firstTask) {
    
            //设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取
            setState(-1);
            this.firstTask = firstTask;
    
            // 利用ThreadFactory和 Worker这个Runnable创建的线程对象
            this.thread = getThreadFactory().newThread(this);
        }
    
        // 任务执行
        public void run() {
            runWorker(this);
        }
    
    }
    
    

    这里将线程包装成一个work,并继承了AQS,主要目的是为了方便现成的中断处理,然后把当前work构造为Thread,在start的时候就会执行runWorker(Worker worker)方法。

    runworker(Worker worker)定义如下:

    final void runWorker(Worker w) {
    
        // 当前线程
        Thread wt = Thread.currentThread();
    
        // 要执行的任务
        Runnable task = w.firstTask;
    
        w.firstTask = null;
    
        // 释放锁,运行中断
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                // worker 获取锁
                w.lock();
    
                // 确保只有当线程是stoping时,才会被设置为中断,否则清楚中断标示
                // 如果线程池状态 >= STOP ,且当前线程没有设置中断状态,则wt.interrupt()
                // 如果线程池状态 < STOP,但是线程已经中断了,再次判断线程池是否 >= STOP,如果是 wt.interrupt()
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() &&
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    wt.interrupt();
                try {
                    // 自定义方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 完成任务数 + 1
                    w.completedTasks++;
                    // 释放锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    执行流程如下:

    1. 调用worker.unlock()释放锁,把状态state至为0,因为interruptIfStarted只有在state>=0是才会执行
    void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
    
    1. 获取task,如果firstTask等于空,就掉用getTask()获取
    2. 调用worker.lock()获取锁,任务执行完成后unlock()释放锁
    3. 在任务执行前后调用钩子beforeExecute()和afterExecute(),可以重写添加自定义行为
    4. task == null或者抛出异常(beforeExecute()、task.run()、afterExecute()均有可能)导致worker线程终止,则调用processWorkerExit()方法处理worker退出流程。

    getTask()定义

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
    
        for (;;) {
    
            // 线程池状态
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // 线程池中状态 >= STOP 或者 线程池状态 == SHUTDOWN且阻塞队列为空,则worker - 1,return null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            int wc = workerCountOf(c);
    
            // 判断是否需要超时控制
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
            if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
    
                // 从阻塞队列中获取task
                // 如果需要超时控制,则调用poll(),否则调用take()
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    

    runWorker()的时候是个循环操作,并且在执行完firstTask后把firstTask设置为null,下一次循环获取getTask就会常是从阻塞队列中获取,如果timed为true,调用poll方法,尝试在keepAliveTime时间窗口内获取task,如果这时候没有新任务提交至阻塞队列,getTask()返回null, 那这个worker线程就会被回收。
    如果timed为false,调用take方法阻塞该线程,直到获取到任务为止。

    回收worker方法processWorkerExit()定义:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
    
        // true:用户线程运行异常,需要扣减
        // false:getTask方法中扣减线程数量
        if (completedAbruptly)
            decrementWorkerCount();
    
        // 获取主锁
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 从HashSet中移出worker
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
    
        // 有worker线程移除,可能是最后一个线程退出需要尝试终止线程池
        tryTerminate();
    
        int c = ctl.get();
        // 如果线程为running或shutdown状态,即tryTerminate()没有成功终止线程池,则判断是否有必要一个worker
        if (runStateLessThan(c, STOP)) {
            // 正常退出,计算min:需要维护的最小线程数量
            if (!completedAbruptly) {
                // allowCoreThreadTimeOut 默认false:是否需要维持核心线程的数量
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                // 如果min ==0 或者workerQueue为空,min = 1
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
    
                // 如果线程数量大于最少数量min,直接返回,不需要新增线程
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 添加一个没有firstTask的worker
            addWorker(null, false);
        }
    }
    
    
    1. completedAbruptly表示线程是否异常终端,如果是的话,需要将线程池中线程数量减1,如果正常结束就不需要了,因为getTask()方法中已经做了处理。
    2. 加锁从wokers set中移除这个worker,调用tryTerminate()尝试terminate线程池,因为可能是最后一个worker
    3. 如果没能终止线程池,线程池中需要保证有一定数量的线程,如果没有min数量的县城的话,调用addWorker()添加一个空任务的线程。

    每次要移除worker的时候都会尝试terminate线程池,tryTerminate()定义如下:

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 线程池处于Running状态
            // 线程池已经终止了
            // 线程池处于ShutDown状态,但是阻塞队列不为空
            if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
    
            // 执行到这里,就意味着线程池要么处于STOP状态,要么处于SHUTDOWN且阻塞队列为空
            // 这时如果线程池中还存在线程,则会尝试中断线程
            if (workerCountOf(c) != 0) {
                // /线程池还有线程,但是队列没有任务了,需要中断唤醒等待任务的线程
                // (runwoker的时候首先就通过w.unlock设置线程可中断,getTask最后面的catch处理中断)
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 尝试终止线程池
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        // 线程池状态转为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }
    
    

    在关闭线程池的过程中,如果线程池处于STOP状态或者处于SHUDOWN状态且阻塞队列为null,则线程池会调用interruptIdleWorkers()方法中断所有线程,注意ONLY_ONE== true,表示仅中断一个线程。

    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
    

    线程终止

    shutdown

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            // 推进线程状态
            advanceRunState(SHUTDOWN);
            // 中断空闲的线程
            interruptIdleWorkers();
            // 交给子类实现
            onShutdown();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }
    

    shutdownNow

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            // 中断所有线程
            interruptWorkers();
            // 返回等待执行的任务列表
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }
    

    与shutdown不同,shutdownNow会调用interruptWorkers()方法中断所有线程。

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    

    同时会调用drainQueue()方法返回等待执行到任务列表。

    private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        ArrayList<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList);
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }
    

    相关文章

      网友评论

          本文标题:线程池ThreadPoolExecutor源码解析

          本文链接:https://www.haomeiwen.com/subject/xaajeqtx.html