美文网首页
谈谈ThreadPoolExecutor的实现

谈谈ThreadPoolExecutor的实现

作者: 宸小朔 | 来源:发表于2019-07-19 19:59 被阅读0次

    概述

    ​ 线程作为系统稀缺资源,如果在应用中进行频繁的创建和销毁,会为我们的应用带来灾难性的体验,增大系统负荷,降低效率。池化技术为该问题的解决提供了一种有效的思路,通过建立一个线程池,每次线程的时候从池中取出一个空闲的线程,这样就省去了线程创建和销毁。java的线程池实现是在jdk1.5开始引入的,本文将对其中最常用的ThreadPoolExecutor的实现进行详细的介绍,系统可以通过本文了解到如何去实现一个线程池,并向Doug Lea大神致敬。

    使用

    ​ 我们先看下面的线程池使用的例子,在该例子中我声明一个核心线程数是2,最大线程数是5,非核心线程线程存活时间1s,阻塞队列大小为1,拒绝策略为AbortPolicy,我们会输出程序执行过程中的线程池达到的最大线程数以及在所有任务执行结束后线程池中线程的数量。代码如下:

    /**
     * Created by yuanqiongqiong on 2019/4/10.
     */
    public class ThreadPoolExecutorTest {
    
        private static Logger LOGGER = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);
    
        //声明一个线程池
        private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 1,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(1),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy());
    
        public static void main(String []args) {
            for (int i = 0; i< 7;i++) {
                String runnableName = "test" + i;
                PersonRunnable personRunnable = new PersonRunnable(runnableName);
                try {
                    threadPoolExecutor.execute(personRunnable);
                } catch (Exception e) {
                    LOGGER.error("执行{}任务异常", runnableName, e);
                }
            }
            try {
                Thread.sleep(500);
                LOGGER.info("线程池当前线程数目 = {}", threadPoolExecutor.getPoolSize());
                Thread.sleep(2000);
            } catch (Exception e) {
                LOGGER.error(e.getMessage());
            }
            LOGGER.info("线程池中达到的最大线程数目 = {}", threadPoolExecutor.getLargestPoolSize());
            LOGGER.info("线程池当前线程数目 = {}", threadPoolExecutor.getPoolSize());
            LOGGER.info("线程池已经完成的任务数量 = {}", threadPoolExecutor.getCompletedTaskCount());
        }
        static class PersonRunnable implements Runnable {
            private String name;
    
            public PersonRunnable(String name) {
                this.name = name;
            }
    
            @Override
            public void run() {
                LOGGER.info("我是" + name + "我在线程" + Thread.currentThread().getName());
                try {
                    Thread.sleep(100);
                } catch (Exception e) {
                    LOGGER.error("任务{}执行异常", Thread.currentThread().getName(), e);
                }
            }
        }
    }
    

    ​ 输出结果如下:

    20:22:26.571 [pool-1-thread-3] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test3我在线程pool-1-thread-3
    20:22:26.571 [pool-1-thread-2] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test1我在线程pool-1-thread-2
    20:22:26.571 [pool-1-thread-4] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test4我在线程pool-1-thread-4
    20:22:26.571 [pool-1-thread-5] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test5我在线程pool-1-thread-5
    20:22:26.571 [pool-1-thread-1] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test0我在线程pool-1-thread-1
    20:22:26.576 [main] ERROR com.meituan.campaign.ThreadPoolExecutorTest - 执行test6任务异常
    java.util.concurrent.RejectedExecutionException: Task com.meituan.campaign.ThreadPoolExecutorTest$PersonRunnable@46f7f36a rejected from java.util.concurrent.ThreadPoolExecutor@421faab1[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
        at com.meituan.campaign.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:30)
    20:22:26.681 [pool-1-thread-5] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test2我在线程pool-1-thread-5
    20:22:27.082 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 线程池当前线程数目 = 5
    20:22:29.084 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 线程池中达到的最大线程数目 = 5
    20:22:29.084 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 线程池当前线程数目 = 2
    20:22:29.085 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 线程池已经完成的任务数量 = 6
    
    

    ​ 由于我们代码设置了最大线程数是5个,并且阻塞队列大小是1,所以同一时间最多会有6个任务被执行,其中1个任务放在阻塞队列中。线程池达到的最大线程数目是5个,因为线程池设置了maximumPoolSize=5。非核心线程会在1s空闲后被回收,因此最终线程池线程数目还是2个。

    实现分析

    ​ 抛开ThreadPoolExecutor,我们先想下实现一个线程池需要哪些成员变量,个人感觉以下变量是必不可少的:(1) 一个存放线程的容器或数组;(2) 一个队列用来在线程池线程不足是存放排队的任务;(3) 一个状态字段表示线程池的状态,用来表示线程池不同生命周期状态。下面,我们看下ThreadPoolExecutor的成员变量:

    //表示线程状态和线程数,高三位代表线程状态,低29位代表线程数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //值为29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //线程池最大线程数,大概为5亿,可以肯定不会达到这么多线程
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
    //线程池处于运行状态可以接收新任务并执行任务队列中的任务
    private static final int RUNNING    = -1 << COUNT_BITS;
    //该状态下线程池不再接收新任务,但是会把任务队列中的任务执行完成,调用shutDown()会进入该状态
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    //该状态下线程池不接受新任务并抛弃任务队列中的任务中断所有正在执行的线程,调用shutDownNoW()会进入该状态
    private static final int STOP       =  1 << COUNT_BITS;
    //已经没有任务可以执行,会从SHUTDOWN和STOP状态变换为该状态
    private static final int TIDYING    =  2 << COUNT_BITS;
    //在执行完terminated()操作后会进入该状态
    private static final int TERMINATED =  3 << COUNT_BITS;
    //任务阻塞队列,存放排队任务
    private final BlockingQueue<Runnable> workQueue;
    //存放线程的hashset
    private final HashSet<Worker> workers = new HashSet<Worker>();
    //线程工厂,生成新线程
    private volatile ThreadFactory threadFactory;
    //拒绝策略
    private volatile RejectedExecutionHandler handler;
    //线程池核心线程数
    private volatile int corePoolSize;
    //线程池最大线程数
    private volatile int maximumPoolSize;
    

    ​ 上述代码的注释给出了线程池各个状态的含义,我们看下各个状态之间的状态转换关系,具体如下:

    (1) RUNNING -> SHUTDOWN:调用了shutdown()函数;

    (2) (RUNNING or SHUTDOWN) -> STOP:调用了shutdownNow();

    (3)SHUTDOWN -> TIDYING:当线程池线程为空并者任务队列为空;

    (4)STOP -> TIDYING:当线程池线程为空;

    (5)TIDYING -> TERMINATED:当调用了terminated()方法;

    ​ 如上示例,我们把一个任务放入线程池的execute()函数中,线程池会为我们选择一个线程来执行我们提交的任务。在这个选择线程的过程中,如果线程池中线程数量小于corePoolSize,那么将创建新线程执行任务;当线程池数量大于等于corePoolSize并且小于maximumPoolSize,线程池会把任务放到阻塞队列workQueue中直到workQueue满了去创建新线程;当线程池线程数量等于maximumPoolSize并且workQueue满时会执行拒绝策略。下面我们通过execute()函数的逻辑来理解上述过程:

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            //如果线程数小于核心线程数,那么创建一个新的线程来执行任务command
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            //如果线程数大于等于核心线程数,线程数处于RUNNING状态(可以将任务加入阻塞队列)并且加入阻塞队列成功(即阻塞队列未满),那么任务就被加入阻塞队列等待空闲线程。
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                //再次检查线程池状态,如果不是RUNNING状态,从阻塞队列中移除任务,执行拒绝策略
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                //线程处RUNNING状态并且线程数是0,则创建个空闲新线程
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            //如果线程数大于核心线程并且阻塞队列已满,则以maximumPoolSize为线程数最大值进行处理
            else if (!addWorker(command, false))
                //线程池中线程达到最大线程数并且阻塞队列已经满执行拒绝策略
                reject(command);
        }
    

    ​ 看到上面的代码逻辑,我们会发现主要的逻辑还是在addWorker里,这个函数主要功能就是为任务分配线程并执行,我们在看这块逻辑之前需要取看一个重要的Worker类。该类封装了线程及任务,可以在内部执行任务,具体定义如下:

        private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            private static final long serialVersionUID = 6138294804551838833L;
            //具体线程
            final Thread thread;
            //线程要执行的任务
            Runnable firstTask;
            //线程完成的任务数
            volatile long completedTasks;
    
            /**
             * Creates with given first task and thread from ThreadFactory.
             * @param firstTask the first task (null if none)
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                //调用线程工厂创建新的线程,threadFactory由我们的线程池构造函数传入,没有指定则使用默认的,这块会创建一个新的线程
                this.thread = getThreadFactory().newThread(this);
            }
    
            /** Delegates main run loop to outer runWorker  */
            public void run() {
                runWorker(this);
            }
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
            //可以看出Worker实现了AQS,其本身也是不可重入锁
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    ​ 思考为什么Worker要继承AQS实现一个独占锁?这个问题我们后面分析。

    ​ 了解了worker的构成,我们就可以具体看下addWorker函数的执行逻辑了,具体如下:

    //core为true,那么创建线程是以corePoolSize作为线程数最大值,否则以maximumPoolSize作为线程数最大值
    private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
              
                // 线程状态是非RUNNING状态不再进行任务提交处理,其中SHUTDOWN状态下已经提交进行任务和阻塞队列         中的任务要继续处理
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    //线程池中线程大于最大线程数或者大于要求的阈值,返回失败
                    if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    //符合要求后,CAS增大线程数,跳出自旋,走下面的线程创建逻辑
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;//标记线程是否启动
            boolean workerAdded = false;//标记线程是否添加成功
            Worker w = null;
            try {
                //创建新的线程并封装为一个Work对象
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        int rs = runStateOf(ctl.get());
                        //对线程池创建的线程状态进行检查
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            //如果新线程检查成功,将新线程加入workers中
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                //更新全局变量,线程池达到的最大线程数,该值可以输出作为线程池参数设定的指标
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        //这块重要了,线程创建成功后,开始执行任务
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    ​ 我们继续跟着上述代码思路走,看下任务如何执行,t.start()的会调用Worker类run()方法,而该方法会调用runWorker来从任务队列中获取任务,执行任务,具体看下:

        final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                //如果Worker中创建时存在任务,则执行;否则,调用getTask从阻塞队列中获取任务,当阻塞队列中没有任务并且线程不应该被回收时,线程会一直阻塞等待获取任务,具体在getTask方法中分析
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // If pool is stopping, ensure thread is interrupted;
                    // if not, ensure thread is not interrupted.  This
                    // requires a recheck in second case to deal with
                    // shutdownNow race while clearing interrupt
                    // 上面的英文注释很清楚了,这块为了处理调用shutdownNow时需要停止所有的线程
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                         (Thread.interrupted() &&
                          runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        //这个函数可以自己实现,默认为空
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            //执行具体任务的业务逻辑
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                           //这个函数可以自己实现,默认为空
                            afterExecute(task, thrown);
                        }
                    } finally {
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                //执行线程销毁过程
                processWorkerExit(w, completedAbruptly);
            }
        }
    

    ​ 在getTask方法中, Worker线程会一直循环的从阻塞队列中获取任务,直到遇到以下情况会返回null,进而执行上面的线程销毁过程processWorkerExit:

    (1) 线程池状态为SHUTDOWN并且任务队列为空;

    (2) 线程数状态变大于SHUTDOWN (STOP TIDYING TERMINATED);

    (3) 线程池线程数大于最大线程数或者线程超时未获取任务的情况下,任务队列为空或者工作线程数大于1;

    这块逻辑具体代码如下:

    private Runnable getTask() {
            boolean timedOut = false; // Did the last poll() time out?
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 线程池状态为STOP或者(状态为SHUTDOWN&&任务队列为空),这个时候无需在执行任务
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
                //设置了允许核心线程超过keepAliveTime空闲后销毁线程 或者 线程数大于核心线程数
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                   //从阻塞队列中获取任务,如果进行超时控制,则调用poll方法,否则调用take一直阻塞到队列中有任务
                    Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    ​ 以上就是线程池中任务执行的大致过程,接下来我们对线程池结束及其中实现的一些细节进行分析。

    原文

    袁琼琼的技术博客,欢迎指针
    http://yuanqiongqiong.cn/2019/04/10/%E8%B0%88%E8%B0%88ThreadPoolExecutor%E7%9A%84%E5%AE%9E%E7%8E%B0/

    相关文章

      网友评论

          本文标题:谈谈ThreadPoolExecutor的实现

          本文链接:https://www.haomeiwen.com/subject/pnfelctx.html