美文网首页
jdk提供的线程池ThreadPoolExecutor详解

jdk提供的线程池ThreadPoolExecutor详解

作者: remax1 | 来源:发表于2020-05-21 14:07 被阅读0次

    前言

    线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。为解决资源分配问题,线程池采用了“池化思想”,即将所有线程统一管理。

    构造方法

     public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler)
    

    · corePoolSize:线程池中核心线程的个数。
    · maximumPoolSize: 线程池中允许创建的最大的线程个数。
    · keepAliveTime:保活时间。
    · workQueue:阻塞队列,用来储存任务。
    · RejectedExecutionHandler :拒绝策略,jdk提供四种,可以自定义拒绝策略。

    任务调度

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
           
            int c = ctl.get();
            //注释①
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))//启动新线程
                    return;
                c = ctl.get();
            }
        //注释② 此时不满足小于corePoolSize条件
         if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)//注释③
                    addWorker(null, false);//false代表的是与maximumPoolSize比较,true则与corePoolSize
            }
          //注释④ 此时,workQueue.offer(command)返回值为false
            else if (!addWorker(command, false)) 
                reject(command);//拒绝策略
        }
    
    

    ·注释①:如果workerCount(runable个数) < corePoolSize,则创建并启动一个线程来执行新提交的任务。
    注释②:跳出if循环,此时corePoolSize>=workerCount ,且阻塞队列元素不满,添加任务到该阻塞队列中。
    注释③:corePoolSize>=workerCount ,且阻塞队列元素满了,且corePoolSize<maximumPoolSize,新建线程并启动。
    注释④:阻塞队列满了,并且workerCount > maximumPoolSize,此时根据拒绝策略来处理该任务,默认的是AbortPolicy(),直接抛异常。

    任务缓冲

    在上面execute()方法中,通过BlockQueue来完成任务的缓冲,这里的任务实际是个runable接口的对象。线程池中是以生产者消费者模式,通过阻塞队列来完成的。生产者则是往队列里添加元素的线程,消费者是从队列里拿元素的线程,即线程池中的线程。这与阻塞队列的特性有关,在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。

    任务申请

    在上面的excute()方法中,我们可以得知,任务执行有两种可能:
    1.由新创建的线程来执行,即addWork(Runable,true);
    2.线程从任务队列中获取任务来执行,即addWorker(null, false),占大多数情况;
    接下来去看看源码,任务申请主要通过getTask()来完成对runable的取出。

     private Runnable getTask() {
            boolean timedOut = false; //最后一个任务是否超时
    
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 判断是否运行状态或者阻塞队列为空
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // 判断是否所有的工作线程被回收
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
                
                if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    Runnable r = timed ?
                      //开始取出任务并返回
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
        }
    

    任务拒绝

    再看excute()中, reject(command)则是我们的拒绝策略,,当线程池的任务缓存队列已满,并且线程池中的线程数目达到maximumPoolSize时,就需要拒绝掉该任务,采取任务拒绝策略,保护线程池。
    拒绝策略实质上是一个接口:

    public interface RejectedExecutionHandler {
        void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
    }
    

    可以通过实现这个接口来自定义拒绝策略,也可以通过JDK提供的四种策略:
    1.ThreadPoolExecutor.AbortPolicy():丢弃任务并抛出java.util.concurrent.RejectedExecutionException异常。
    2.ThreadPoolExecutor.CallerRunsPolicy:由提交任务的线程取处理。这种情况是需要让所有任务都执行完毕。
    3.ThreadPoolExecutor.DiscardPolicy:丢弃任务,不报出异常。
    4.ThreadPoolExecutor.DiscardOldPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务。

    Worker线程管理

    先看看Worker类:

    private final class Worker
            extends AbstractQueuedSynchronizer
            implements Runnable
        {
            /**
             * This class will never be serialized, but we provide a
             * serialVersionUID to suppress a javac warning.
             */
            private static final long serialVersionUID = 6138294804551838833L;
    
            /** Thread this worker is running in.  Null if factory fails. */
            final Thread thread;
            /*初始化任务,可以为空 */
            Runnable firstTask;
            /** Per-thread task counter */
            volatile long completedTasks;
    
            /**
             * 注释①
             */
            Worker(Runnable firstTask) {
                setState(-1); // inhibit interrupts until runWorker
                this.firstTask = firstTask;
                this.thread = getThreadFactory().newThread(this);
            }
    
           
            public void run() {
                runWorker(this);
            }
    
            // Lock methods
            //注释②
            // The value 0 represents the unlocked state.
            // The value 1 represents the locked state.
    
            protected boolean isHeldExclusively() {
                return getState() != 0;
            }
    
            protected boolean tryAcquire(int unused) {
                if (compareAndSetState(0, 1)) {
                    setExclusiveOwnerThread(Thread.currentThread());
                    return true;
                }
                return false;
            }
    
            protected boolean tryRelease(int unused) {
                setExclusiveOwnerThread(null);
                setState(0);
                return true;
            }
    
            public void lock()        { acquire(1); }
            public boolean tryLock()  { return tryAcquire(1); }
            public void unlock()      { release(1); }
            public boolean isLocked() { return isHeldExclusively(); }
    
            void interruptIfStarted() {
                Thread t;
                if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    }
                }
            }
        }
    

    注释①:Worker这个工作线程,实现了runable接口,并持有一个线程thread,一个初始化任务
    firstTask,即第一个任务,可以为空。如果不为空,线程启动时则会执行这个任务,应对线程池初期的情况。
    注释②:Worker通过继承AQS,使用AQS来实现独占锁的功能。AQS不可重入。
    1.lock()方法一旦获取了锁,表示当前线程正在执行任务。
    2.如果正在执行任务,则不应该中断线程。
    3.如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断。

    线程增加

    再前面的excute()方法中出现的addWorker()则是对线程的增加。去看看里面做了啥事

     private boolean addWorker(Runnable firstTask, boolean core) {
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 一些判断
                if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                       firstTask == null &&
                       ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    int wc = workerCountOf(c);
                    if (wc >= CAPACITY ||
                      //注释①
                        wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    //注释②
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // Recheck while holding lock.
                        // Back out on ThreadFactory failure or if
                        // shut down before lock acquired.
                        int rs = runStateOf(ctl.get());
    
                        if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            workers.add(w);
                            int s = workers.size();
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
        }
    

    注释①:core参数为true表示在新增线程时会判断当前活动线程数是否少于corePoolSize,false表示新增线程前需要判断当前活动线程数是否少于maximumPoolSize。
    注释②:设置可重入锁,完成worker线程的增加,添加成功后会启动这个工作线程。

    Worker线程回收

     private void processWorkerExit(Worker w, boolean completedAbruptly) {
            if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
              //记录一下完成任务的个数
                completedTaskCount += w.completedTasks;
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    }
    

    线程池中线程的销毁依赖JVM自动的回收,线程池自己决定那些线程需要回收,消除一下引用即可。

    Worker线程执行任务

    在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:
    1.while()循环不断取出runable任务,通过getTask()取出。
    2.执行任务
    3.如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。

    实际应用

    在应用场景中主要分为IO密集型和CPU密集型。最主要还是设计核心线程个数和最大线程个数
    的值。

    相关文章

      网友评论

          本文标题:jdk提供的线程池ThreadPoolExecutor详解

          本文链接:https://www.haomeiwen.com/subject/ztnaohtx.html