美文网首页Java Concurrency
JAVA并发(13)— ThreadPoolExecutor的实

JAVA并发(13)— ThreadPoolExecutor的实

作者: 小胖学编程 | 来源:发表于2020-02-27 12:02 被阅读0次

    测试代码

    /**
     * @program: springbootclient2
     * @description: 自定义线程池,扩展任务
     * @create: 2020-02-27 10:46
     */
    @Slf4j
    public class MyThreadPoolExecutor extends ThreadPoolExecutor {
    
        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        }
    
        //装饰线程池中任务
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            //打印线程信息(当前线程是什么?)
            log.info("执行前的方法..." + Thread.currentThread());
        }
    
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            log.info("执行后的方法..." + Thread.currentThread());
        }
    }
    
    /**
     * @program: springbootclient2
     * @description: 探索Java线程池的实现原理
     * @create: 2020-02-26 10:33
     */
    @Slf4j
    public class TestThreadPoolExecutor {
    
        public static void main(String[] args) {
            //创建了一个线程池
            ThreadPoolExecutor pool =
                    new MyThreadPoolExecutor(1, 3,
                            200, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
            //打印日志(核心线程执行)
            pool.execute(() -> {
                try {
                    Thread.sleep(1000 * 200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                log.info("日志打印...哈哈哈");
            });
            //再次打印日志(阻塞队列执行)
            pool.execute(() -> {
                log.info("日志再打印...");
            });
            //再次打印日志(最大线程执行)
            pool.execute(() -> {
                log.info("日志再打印...");
            });
        }
    }
    

    实现原理

    用户向线程池提交一个任务(实现Runnable接口)后

    1. 若小于核心线程数,那么直接开启一个线程执行;
    2. 若大于核心线程数,则将任务放入阻塞队列中;
    3. 若阻塞队列满了,则会使用最大线程数,继续开启线程执行任务;
    4. 若最大线程数满了,那么采取拒绝策略去拒绝任务;

    而执行任务的流程:创建Worker线程,将传入的任务交给Worker线程去执行,而Worker线程是ThreadFactory产生的,若传入的任务为null,则会通过getTask()去workQueue中获取任务去执行。

    源码中的细节:

    • 维护了一个AtomicInteger的ctl,前3位标识线程池的状态,后29位表示线程池中线程的数量。使用CAS来修改参数;
    • Worker线程使用装饰器的模式增强传入的任务,并预留两个钩子方法扩展。
    • 使用HashSet来保存Worker线程;
    • 核心线程数存活时间的实现:使用CAS+自旋,首先去队列中获取元素,若超时后,修改标识位。再次循环,若线程池中存在线程且任务队列中没有任务,那么通过CAS修改ctl中当前线程的数量,跳出方法去销毁线程。

    源码分析

    ThreadPoolExecutor关键属性

    //存放当前运行的worker数量以及线程池状态
    //int是32位的,这里把int的高3位拿来充当线程池状态标识位,后29位拿来充当当前运行的worker数量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //存放任务的阻塞队列
    private final BlockingQueue<Runnable> workQueue;
    //存放工作线程(worker)的集合,用set来存放
    private final HashSet<Worker> workers = new HashSet<Worker>();
    //历史达到的worker数最大值
    private int largestPoolSize;
    //当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
    private volatile RejectedExecutionHandler handler;
    //超出coreSize的worker的生存时间
    private volatile long keepAliveTime;
    //核心线程数量
    private volatile int corePoolSize;
    //最大线程的数量,一般当workQueue满了才会用到这个参数
    private volatile int maximumPoolSize;
    
    public void execute(Runnable command) {  
        if (command == null)  
            throw new NullPointerException();  
        int c = ctl.get();  
       //workerCountOf(c)会获取当前正在运行的worker数量。
        if (workerCountOf(c) < corePoolSize) {  
          //如果小于核心线程数,创建一个worker然后直接执行该任务。
            if (addWorker(command, true))  
                return;  
            c = ctl.get();  
        }  
        //当核心线程数满的时候,会执行该判断,判断线程池的状态为运行态时
        //会将任务放入到queue中(若queue满了,会返回false)
        if (isRunning(c) && workQueue.offer(command)) {  
            int recheck = ctl.get();  
            if (! isRunning(recheck) && remove(command))  
                reject(command);  
            else if (workerCountOf(recheck) == 0)  
                addWorker(null, false);  
        }  
        //使用最大线程去执行,若执行失败,返回false,将执行拒绝策略
        else if (!addWorker(command, false))  
            reject(command);  
    }  
    

    根据传入的任务,创建worker工作线程,并运行:

    //firstTask是传入线程池的任务,core是使用核心线程去执行,还是最大线程去执行
    private boolean addWorker(Runnable firstTask, boolean core) {  
        retry:  
        for (;;) {  
            //先获取线程池的状态
            int c = ctl.get();  
            //rs即线程池的状态(ctl的前3位表示)
            int rs = runStateOf(c);  
      
            // 如果线程池是关闭的(SHUTDOWN=0),或者workQueue队列非空,直接返回false,跳出方法
            if (rs >= SHUTDOWN &&  
                ! (rs == SHUTDOWN &&  
                   firstTask == null &&  
                   ! workQueue.isEmpty()))  
                return false;  
      
            for (;;) {  
                //获取worker的数量
                int wc = workerCountOf(c);
               //第一个判断是:wc>=536870911  
               //根据入参core,判断当前线程与核心线程数/最大线程数去比较。
                if (wc >= CAPACITY ||  
                    wc >= (core ? corePoolSize : maximumPoolSize))  
                    return false; 
               // 尝试修改ctl的workerCount的值(+1),这里使用的是CAS,如果失败,继续下一次重试,直到获取成功为止。
                if (compareAndIncrementWorkerCount(c))  
                  //如果设置成功就跳出外层的for循环
                    break retry;  
               //重读一次ctl,判断如果线程池的状态改变,会重新循环一次。
                c = ctl.get();  // Re-read ctl  
                if (runStateOf(c) != rs)  
                    continue retry;  
            }  
        }  
        //执行到此处时,线程的数量+1,但实际上未开启新线程,下面是创建新worker线程。
        boolean workerStarted = false;  
        boolean workerAdded = false;  
        Worker w = null;  
        try {  
            //创建一个worker,将提交上来的任务直接交给worker。
            w = new Worker(firstTask); 
            //获取Worker线程中的thread对象。 
            final Thread t = w.thread;  
            if (t != null) {  
                final ReentrantLock mainLock = this.mainLock;  
                //新增Worker线程并存入HashSet中(加锁,该动作是串行画的)
                mainLock.lock();  
                try {  
                   //获取线程池的状态
                    int rs = runStateOf(ctl.get());  
                   //线程池没有中断或者线程池已经中断,但是线程还持有任务均往下执行(否则该方法不会做操作)。
                    if (rs < SHUTDOWN ||  
                        (rs == SHUTDOWN && firstTask == null)) {  
                        if (t.isAlive()) //如果worker的线程线程已经启动,抛异常
                            throw new IllegalThreadStateException();  
                       //添加新建的worker到HashSet<Worker>中
                        workers.add(w);  
                        int s = workers.size();  
                        //更新历史worker数量的最大值(和Worker Set容量进行对比)
                        if (s > largestPoolSize)  
                            largestPoolSize = s;  
                        //设置新增标志位
                        workerAdded = true;  
                    }  
                } finally {  
                    mainLock.unlock();  
                }  
               //如果worker是新增的,就启动该线程。
                if (workerAdded) {  
                    t.start();  
                    //成功启动线程,设置对应的标志位
                    workerStarted = true;  
                }  
            }  
        } finally {  
            //启动失败,就会触发相应的方法。
            if (! workerStarted)  
                addWorkerFailed(w);  
        }  
        return workerStarted;  
    }  
    

    2. Worker结构

    Worker是ThreadPoolExecutor内部定义的一个内部类

    //实现了Runnable接口,所以可以作为线程使用
    private final class Worker  
        extends AbstractQueuedSynchronizer  
        implements Runnable  
    {  
    
        private static final long serialVersionUID = 6138294804551838833L;  
       //运行的线程,前面addWorker方法中就是直接通过启动这个线程来启动这个worker。
        final Thread thread;  
        //当一个worker刚创建时,就会尝试执行这个任务
        Runnable firstTask;  
        //记录完成任务的数量
        volatile long completedTasks;  
      
        Worker(Runnable firstTask) {  
            setState(-1); // inhibit interrupts until runWorker  
            this.firstTask = firstTask;  
             //创建一个Thread,将自己设置给他,后面这个thread启动的时候,也就是执行这个worker。
            this.thread = getThreadFactory().newThread(this);  
        }  
        //使用装饰器的模式去扩展了run()方法。
        public void run() {  
           //调用了ThreadPoolExecutor的runWorker方法;
            runWorker(this);  
        }  
      ...
    }  
    

    当worker工作线程执行run方法时,实际上会执行该方法:
    worker会判读自己是否持有任务,若未持有任务,会通过getTask()方法去workQueue中获取任务(Runnable任务)

    final void runWorker(Worker w) {  
        //获取到当前线程
        Thread wt = Thread.currentThread();  
        //获取到worker中的任务
        Runnable task = w.firstTask;  
        w.firstTask = null;
        //执行unlock方法,允许其他线程来中断自己  
        w.unlock(); // allow interrupts  
        boolean completedAbruptly = true;  
        try {  
            //如果前面firstTask有值,那么直接执行这个任务;
            //如果没有具体的任务,就执行getTask()方法从队列中获取任务;
            while (task != null || (task = getTask()) != null) {  
            //执行任务前先锁住,这里的作用就是给shutdown()判断是否有worker在执行中。
            //shutdown方法会尝试给线程加锁,如果线程在执行,就不会中断它。
                w.lock();  
               //详见下面分析
                if ((runStateAtLeast(ctl.get(), STOP) ||  
                     (Thread.interrupted() &&  
                      runStateAtLeast(ctl.get(), STOP))) &&  
                    !wt.isInterrupted())  
                    wt.interrupt();  
                try {  
                   //执行任务前被调用,Spring预留的方法,可扩展
                    beforeExecute(wt, task);  
                    Throwable thrown = null;  
                    try {  
                        //正常调用run()方法。
                        task.run();  
                    } catch (RuntimeException x) {  
                        thrown = x; throw x;  
                    } catch (Error x) {  
                        thrown = x; throw x;  
                    } catch (Throwable x) {  
                        thrown = x; throw new Error(x);  
                    } finally {  
                        //执行任务后调用,预留的方法,可扩展
                        afterExecute(task, thrown);  
                    }  
                } finally {  
                    task = null;  
                    //记录完成的任务数量
                    w.completedTasks++;  
                    w.unlock();  
                }  
            }  
            completedAbruptly = false;  
        } finally {  
            processWorkerExit(w, completedAbruptly);  
        }  
    }  
    

    线程的中断:

    • stop方法:是一个过时的方法,不应该在使用这种方法去中断线程;
    • isInterrupted方法:是Thread类的普通方法,会返回调用方法的线程类状态。
    • interrupted方法:是Thread类的静态方法,返回的是调用方法的线程的状态,interrupted方法会清除线程的中断状态。
    • interrupt方法:用来中断线程,也就是将中断标志设置为true的方法;

    所谓的中断,只是将中断标识设置了下,并没有真正的中断线程的运行,一般我们需要自己检查线程的中断状态,并设计如何中断。

    方法sleep()wait以及join会对中断标识有所处理,当线程中断标识为true时,会抛出异常。

    关闭线程池的方法:

    • shutdown方法:告诉线程池拒绝接受新的任务,但是已经开始执行的以及进入队列中的任务将会完成执行;
    • shutdownNow方法:也是告诉线程池拒绝新的任务,但是它会试图将已经开始执行的任务以及队列中的任务取消。这种取消是通过中断线程来实现的。也就是说我们的任务中没有针对线程中断做处理的情况下,在实际的使用体验上,shutdownNow与shutdown效果是相同的。

    当调用shutdownNow()方法时,线程池会变为stop状态。

    1. runStateAtLeast(ctl.get(), STOP)为true(即线程池被shutdownNow),那么如果线程没有中断,确保线程被中断

    2. runStateAtLeast(ctl.get(), STOP)为false(线程池没有被shutdownNow)。在线程没有被中断的情况下,不去中断线程。当然这种情况需要重新检查shutdownNow的风险,当清理中断线程时。

    代码分析:

    if ((runStateAtLeast(ctl.get(), STOP) ||
        (Thread.interrupted() &&
         runStateAtLeast(ctl.get(), STOP))) &&
        !wt.isInterrupted())
         //中断worker线程。
        wt.interrupt();
    

    在阻塞队列中获取任务

    在上面的java.util.concurrent.ThreadPoolExecutor#runWorker方法中task的获取是Worker中的firstTask属性或者getTask()方法来完成获取的。

    private Runnable getTask() {  
        boolean timedOut = false; // Did the last poll() time out?  
      
        for (;;) {  
            int c = ctl.get();  
           //获取到线程池的状态
            int rs = runStateOf(c);  
      
            //如果返回null,那么上面runWorker()方法会跳出while循环,然后执行销毁worker线程。
            //SHUTDOWN:表示执行了shutdown()方法;
            //STOP:表示执行了shutdownNow()方法;
            //如果执行了shutdown方法且workQueue为空,那么ctl线程数量-1;
            //如果执行了shutdownNow方法,那么ctl线程数量也-1;
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {  
                decrementWorkerCount();  
                return null;  
            }  
            //获取当前正在运行中的worker数量
            int wc = workerCountOf(c);  
      
            // 是否允许核心线程超时或者当前运行的线程数超过了核心线程数。
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  
            //timeOut默认false。若配置keepTimeOut并未获取到任务时,会置为true。此时,若存在工作线程,且队列为null,那么就销毁该线程。
            if ((wc > maximumPoolSize || (timed && timedOut))  
                && (wc > 1 || workQueue.isEmpty())) {  
                //通过CAS来设置workerCount,如果存在多个线程竞争,只有一个可以设置成功。
               //如果没有设置后才能给,就进入下一次循环。
                if (compareAndDecrementWorkerCount(c))  
                    return null;  
                continue;  
            }  
      
            try {  
                //在workQueue中取任务,poll方法存在等待的超时时间keepAliveTime,但是在规定时间内没有在阻塞队列中获取任务,那么timedOut会被置为true。
               //take()方法会一直等待。
                Runnable r = timed ?  
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  
                    workQueue.take();  
                if (r != null)  
                    return r;  
                //如果r为null,就设置timedOut为true(注意,方法并未跳出,开始自旋);
                timedOut = true;  
            } catch (InterruptedException retry) {  
                timedOut = false;  
            }  
        }  
    }  
    

    推荐阅读

    Java线程池实现原理详解

    历史文章

    JAVA并发(1)—java对象布局
    JAVA并发(2)—PV机制与monitor(管程)机制
    JAVA并发(3)—线程运行时发生GC,会回收ThreadLocal弱引用的key吗?
    JAVA并发(4)— ThreadLocal源码角度分析是否真正能造成内存溢出!
    JAVA并发(5)— 多线程顺序的打印出A,B,C(线程间的协作)
    JAVA并发(6)— AQS源码解析(独占锁-加锁过程)
    JAVA并发(7)—AQS源码解析(独占锁-解锁过程)
    JAVA并发(8)—AQS公平锁为什么会比非公平锁效率低(源码分析)
    JAVA并发(9)— 共享锁的获取与释放
    JAVA并发(10)—interrupt唤醒挂起线程
    JAVA并发(11)—AQS源码Condition阻塞和唤醒
    JAVA并发(12)— Lock实现生产者消费者

    相关文章

      网友评论

        本文标题:JAVA并发(13)— ThreadPoolExecutor的实

        本文链接:https://www.haomeiwen.com/subject/ehvdoctx.html