ThreadPoolExecutor源码分析

作者: 蓝梅 | 来源:发表于2021-06-15 00:01 被阅读0次

    记得最开始接触并发编程是,看的第一块的源码就是ThreadPoolExecutor,但是之前没有做任何的笔记,今天再来复习一下

    一、线程池主要属性

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    

    线程池使用ctl代表线程状态和真正运行的线程数量,包括两个部分;第一个部分,就是高3位,代表线程池的状态;然后是低29位代表正在执行有效的线程数量;COUNT_BITS 等于 29;CAPACITY 表示二进制 29 个 1;这两个值是为了方便计算线程状态,以及线程数量的;

    线程池状态主要有:
    RUNNING:高三位为111,线程池正常运行状态
    SHUTDOWN:高三位为000,该状态不接受新任务,处理已接收的任务
    STOP:高三位为001,该状态不接受新任务,也不处理已接收的任务,并且会中断正在处理的任务,调用shutdownNow()方法时,可以从RUNNING或者是SHUTDOWN状态变为STOP状态
    TIDYING:高三位为010,当所有线程都终止,ctl记录的线程数量变为0,则会变为TIDYING状态;
    TERMINATED:高三位为011,当线程池彻底终止,就会变成TERMINATED状态

    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
    

    corePoolSize:核心线程数
    maximumPoolSize:最大线程数
    keepAliveTime:线程最大空闲时间
    unit:空闲时间单位
    workQueue:阻塞队列
    threadFactory:生成线程的工厂类,默认使用Executors.defaultThreadFactory() 来创建线程
    handler:拒绝策略,拒绝策略有一下几种
    1、AbortPolicy:直接抛出异常,默认策略;
    2、CallerRunsPolicy:用调用者所在的线程来执行任务;
    3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4、DiscardPolicy:直接丢弃任务;

    二、线程池执行

    当线程执行时,先看是否达到核心线程数,如果还没达到核心线程数则,直接起线程;当达到核心线程数,则把线程放入阻塞队列;当阻塞队列放满,则开始起最大线程数,当线程数量起到最大线程后;后续线程进入拒绝策略;当起了超过核心线程数的线程,去阻塞队列获取任务时,超过最大空闲时间则直接返回,然后关掉该线程。

    三、源码分析

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //当线程数小于核心线程数,则直接新起任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //优先判断线程池运行状态,然后再去把线程放入阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次判断运行状态,如果线程池状态关闭后,然后执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如果当前线程池没有线程在跑,则启动一个非核心的线程,去执行
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //入队失败,则起非核心的线程去执行(最大线程数),如果失败,则执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }
    

    从上述代码就能看出,整个线程池执行的核心流程;我们再来分析一下细节,先看下内部类Worker

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            private static final long serialVersionUID = 6138294804551838833L;
            //当前执行的线程
            final Thread thread;
            //创建该woker时,传入的任务
            Runnable firstTask;
            //当前woker执行完成的任务数
            volatile long completedTasks;
    
            Worker(Runnable firstTask) {
                //设置锁的初始状态
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                //创建线程
                this.thread = getThreadFactory().newThread(this);
            }
    
            public void run() {
                //该线程调用的,是ThreadPoolExecutor runWorker方法
                runWorker(this);
            }
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            //使用CAS去获取锁
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            //释放锁
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            //调用该方法,去中断线程
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    线程池执行时,是先把每个任务封装成Worker,来执行的;我们来看看addWorker的代码分析

    //参数为第一个执行的任务(有可能为空),是否创建的是核心线程
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
    
            // 判断线程池运行状态
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
    
            for (;;) {
                /*如果当线程达到CAPACITY最大数量
                 *或者启用的是核心线程数,超过了核心线程数
                 *或者超过了最大线程数,则添加woker失败
                 */
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //使用cas对执行的线程数加一,如果成功则跳出循环,去添加woker
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                //如果执行到这里,线程池状态有变化,则重新判断
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
    
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //新建一个Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //对当前线程池加锁,为了保证并发时,workers添加的安全性
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    //判断线程池的状态
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //添加到线程池维护的列表中引用
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    //释放锁
                    mainLock.unlock();
                }
                //添加成功,则运行当前woker线程
                if (workerAdded) {
                    //woker线程执行,调用woker的run方法,又调用ThreadPoolExecutor的runWorker方法
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    

    接下来,我们再看看runWorker方法

    final void runWorker(Worker w) {
        //wt目前只是判断了是否打断,beforeExecute,该方法为空,没有实现,如果我们自己实现线程池,可以做日志等其他的输出
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        //先释放锁,是为了清除interrupt,任务还没开始,不允许打断
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果当前任务为空,则去阻塞队列获取任务
            while (task != null || (task = getTask()) != null) {
                //对当前woker加锁,这个加锁时为了防止在执行期间,被其他线程中断,主要是调用interruptIdleWorkers()这个方法
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                //判断线程池运行状态,和中断状态
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //直接执行我们传入的task的run方法,所以,线程池每次执行时,都是跑的自己woker线程,我们传入进去的Runnable对象,并没有调用start方法,只是跑了run方法
                        task.run();
                    } catch (RuntimeException x) {
                        //如果有异常,则直接抛出,但是会在processWorkerExit方法中,新起一个woker
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    //执行完成则完成任务数加一,释放锁
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //处理线程跑完后的逻辑,主要是对是否是抛出异常做处理
            processWorkerExit(w, completedAbruptly);
        }
    }
    

    从runWorker方法中我们看到了,线程执行时,并不是去start,这样避免了线程上下文的切换,只是在当前线程执行了我们传入task的run方法,提高了系统的整体性能;接下来我们再看看getTask怎么来获取线程的;

    private Runnable getTask() {
        boolean timedOut = false; 
    
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //如果当前线程池状态已经停止,则去把线程池清空
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
    
            //获取线程数量
            int wc = workerCountOf(c);
            /*
             *判断获取线程是否需要超时
             *jdk1.5之后,allowCoreThreadTimeOut可以设置是否允许核心线程数超时
             *一般是不会设置,则只有当线程数量超过了核心线程数,则从阻塞队列获取线程会超时
             */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            //如果该线程超时,或者超过了最大线程数,或者队列为空了,则去对当前运行的线程数减一
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }
    
            try {
                //如果线程为核心线程数,则使用take方法阻塞获取线程,不会超时
                //如果不为核心线程数,则使用poll获取task,如果超时则获取为空,会进入上面一步,进行销毁,线程数量会减一
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //如果获取到了,则跳出该方法,返回该task
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }
    

    从上面可以看出,当去从阻塞队列中获取任务时,是靠阻塞队列的特性去阻塞获取线程,如果获取不到,就直接销毁当前woker了;并且,woker中没有核心线程的标记,是根据数量去判断的,如果小于或者等于了我们设置的核心线程数,则会阻塞去获取task;

    四、Executors

    Executors工具类提供了几种线程池模型,这里分析一下,但是不建议大家用,最好还是根据自己的业务来新建线程池,因为这几种线程池可能会引起内存溢出

    1.newFixedThreadPool

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    

    FixedThreadPool核心线程数,最大线程数相同,队列是无界的阻塞队列,如果线程消费能力不够时,就有可能出现内存溢出的问题

    2.newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    

    SingleThreadExecutor核心线程数和最大线程数都是1,和newFixedThreadPool一样,如果线程消费能力不够时,就有可能出现内存溢出的问题

    3.newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    

    CachedThreadPool,核心线程数为0,最大线程数为MAX_VALUE,传入的阻塞队列长度只有1。如果线程消费能力不够时,可能会启用很多线程,所以也可能会出现内存溢出的问题;

    相关文章

      网友评论

        本文标题:ThreadPoolExecutor源码分析

        本文链接:https://www.haomeiwen.com/subject/wduoeltx.html