美文网首页
线程池(一)ThreadPoolExecutor

线程池(一)ThreadPoolExecutor

作者: Tomy_Jx_Li | 来源:发表于2019-03-24 21:13 被阅读0次

    本文分析是基于jdk1.8的基础上。

    1.ThreadPoolExecutor是什么?

    ThreadPoolExecutor是一个线程池管理类,线程池操作都是操作这个类。

    2.什么是线程池?

    跟很多池化技术一样,线程池也是用来改善程序性能的。因为每个线程的创建和销毁都是有代价的。而一旦线程的具体执行周期,小于线程的创建和销毁周期。那么就会造成很多浪费。因为每次线程具体的执行时间才用了1s,但是创建和销毁却用了3s。那么我们的实际工作周期才是25%。浪费很多。那么我们就进行线程的池化,将线程的创建和销毁交给线程池管理,我们只需要提交任务即可。而线程池一般不会真正的进行线程的创建和销毁,只是让空闲线程进行等待即可。

    就如下图: 传统的线程执行路线
    线程池线程执行流程

    3.什么情况下使用线程池

    如果只是偶尔的线程执行,那么完全没必要创建线程池,增加额外的线程管理成本和多余线程空闲等待等问题。所以在需要频繁的创建和销毁线程的时候可以使用线程池。

    4.线程池的创建

    4.1线程池的创建

    在juc包下,有一个类Executors里有jdk提供的静态方法创建线程池。这里有很多文章,不进行过多赘述了。可以参考这里

    4.2创建的入参解析

    ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,
    long keepAliveTime,
    TimeUnit unit,
    BlockingQueue<Runnable> workQueue,
    ThreadFactory threadFactory,
    RejectedExecutionHandler handler)
    以最全的一个构造器进行说明。
    corePoolSize:核心线程数
    maximumPoolSize:最大线程数
    keepAliveTime:线程存活最大空闲时间(如果设置核心线程也可回收,同样对核心线程也有用)
    unit:keepAliveTime的时间单位
    workQueue:工作队列,当线程池达到核心线程数时,需要将新加入的任务放入队列。
    threadFactory:线程工厂,每次创建工作线程都是由线程工厂创建的
    handler:拒绝策略,当线程数达到最大线程数的时候,并且队列已满,那么就执行拒绝策略。

    4.3线程池数量的控制

    因为现在java的规范(国内的alibaba出的java规范)要求不能用jdk自带的静态方法创建线程池,需要自己选择。因为,
    1.jdk创建的固定线程池和单线程线程池都采用的是无界阻塞队列,可能任务过多导致内存溢出
    2.缓存线程池,由于可能会创建过多的线程而导致系统卡死
    3.其他两个一个为任务调度线程池,一个为forkJoin线程池。(暂时不说这两个,还需要在分析下)
    选择:一般计算密集行的就是cpu+1,IO密集型的会更大一些。这些都是需要在实际的环境中查看cpu的使用率来进行调优的,这里只是一个经验值而已。

    5.线程池的线程管理

    既然是线程池,那么就需要对线程进行管理。线程管理就包括线程的新增、销毁、线程异常的管理。

    5.1新增

    客户端调用execute或者submit方法。submit内部也是调用execute方法,所以这里主要查看下execute方法。jdk源码如下:

    public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            int c = ctl.get();
            // 判断下工作线程数是否超出核心线程数
            if (workerCountOf(c) < corePoolSize) {
                // 核心线程数未达到允许的最大值,创建核心线程并将任务作为第一个任务,让工作线程执行
                if (addWorker(command, true))
                    return;
                // 加入失败的话要重新获取ctl,因为这个时候可能线程池的状态会发生改变
                c = ctl.get();
            }
            // 判断线程池是否在运行状态,是的话需要将任务成功加入到队列中
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                // 这个时候还需要进行二次判断,因为可能线程状态变更了
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                // 这里处理的是什么情况呢,比如初始化线程池的时候,核心线程数为0
                // 同时设置队列是无界阻塞队列的话,就没有工作线程执行任务了
                // 所以这段代码的意思就是保证至少有一个工作线程
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            // 线程不在运行状态或者加入队列失败执行拒绝策略
            else if (!addWorker(command, false))
                reject(command);
    }
    

    通过注解我们可以看出,这里最主要的方法就是addworker方法
    再次分析addworker

    private boolean addWorker(Runnable firstTask, boolean core) {
            // 这一部分判断是否需要进行线程的创建
            retry:
            for (;;) {
                int c = ctl.get();
                int rs = runStateOf(c);
    
                // 判断当前线程池的状态,如果状态大于SHUTDOWN或者
                // 当前线程池的状态为SHUTDOWN,首任务为空,任务队列为空的时候不需要进行线程的创建
                if (rs >= SHUTDOWN &&
                        ! (rs == SHUTDOWN &&
                                firstTask == null &&
                                ! workQueue.isEmpty()))
                    return false;
    
                for (;;) {
                    // 获取工作线程数
                    int wc = workerCountOf(c);
                    // 如果工作线程数大于最大容忍线程数,这里的CAPACITY是2的29次方。也就是536870912
                    // 或者当创建的是核心线程的时候工作线程数大于核心线程,当创建的是其他线程的时候,不能大于最大线程数。
                    if (wc >= CAPACITY ||
                            wc >= (core ? corePoolSize : maximumPoolSize))
                        return false;
                    // 增加工作线程数,结束循环,这里可以看到如果多个线程同时向创建核心线程或者超出最大线程数的线程的话,
                    // 在这里也不会成功的,最接着走下去不会退出该循环
                    if (compareAndIncrementWorkerCount(c))
                        break retry;
                    // 如果线程池运行状态改变了,需要重新进行外部循环,否则进行内部循环
                    c = ctl.get();  // Re-read ctl
                    if (runStateOf(c) != rs)
                        continue retry;
                    // else CAS failed due to workerCount change; retry inner loop
                }
            }
    
            // 到达这里,也就是可以进行线程的创建了
    
            // 线程是否启动和线程是否已经添加到工作线程队列中了
            boolean workerStarted = false;
            boolean workerAdded = false;
            Worker w = null;
            try {
                // 创建工作线程
                w = new Worker(firstTask);
                final Thread t = w.thread;
                if (t != null) {
                    final ReentrantLock mainLock = this.mainLock;
                    mainLock.lock();
                    try {
                        // 需要在吃校验线程池的状态
                        int rs = runStateOf(ctl.get());
    
                        // 运行状态下,
                        // 或者是SHUTDOWN状态并且首个运行的任务为空
                        // 第一种情况好理解,第二种情况是因为上面也有个类似的校验,但是多一个条件! workQueue.isEmpty()
                        // 那么能到这里证明队列不为空,也就是需要创建线程的
                        if (rs < SHUTDOWN ||
                                (rs == SHUTDOWN && firstTask == null)) {
                            if (t.isAlive()) // precheck that t is startable
                                throw new IllegalThreadStateException();
                            // 将w加入到工作线程队列中去
                            workers.add(w);
                            int s = workers.size();
                            // 设置最大线程数
                            if (s > largestPoolSize)
                                largestPoolSize = s;
                            workerAdded = true;
                        }
                    } finally {
                        mainLock.unlock();
                    }
                    if (workerAdded) {
                        // 启动线程,这里启动的是worker下的属性thread,但是为什么能启动Worker的呢,看下一段代码
                        t.start();
                        workerStarted = true;
                    }
                }
            } finally {
                // 如果启动不成功,需要做的事情
                if (! workerStarted)
                    addWorkerFailed(w);
            }
            return workerStarted;
    }
    

    在代码中会出现t.start();这里明明启动的是worker下的thread属性,为何会启动worker本身呢。
    主要是在new Worker的时候做的事情。代码如下

    Worker(Runnable firstTask) {
            // 设置线程状态,知道线程启动之前,禁止中断
            setState(-1);
            this.firstTask = firstTask;
            // 这个就是thread属性的设置值。而这里在创建thread的时候
            // 传入了一个this参数,及worker本身。而一般的thread都是new一个Thread并将worker传入。
            // 而Thread的run方法,在运行是时候就会执行传入的这个Runnable实例的run方法了
            this.thread = getThreadFactory().newThread(this);
    }
    

    而上一步中如果worker启动失败的话,需要进行一个后处理的,处理的内容,就是从工作线程队列中移除当前worker,同时将工作线程数减一。

    5.2执行

    启动了工作者之后,就需要执行任务了。那么任务是如何执行的呢。通过查看worker的run方法,发现执行的是runWorker方法,分析如下:

    final void runWorker(Worker w) {
            Thread wt = Thread.currentThread();
            Runnable task = w.firstTask;
            w.firstTask = null;
            w.unlock(); // allow interrupts
            boolean completedAbruptly = true;
            try {
                // 获取执行的任务
                while (task != null || (task = getTask()) != null) {
                    w.lock();
                    // 刚开始看这段代码的时候,感觉只需要一个runStateAtLeast(ctl.get(), STOP)就可以了
                    // 没有仔细看注解,所以以后还是要好好看注解的
                    // 如果线程是已经处于stop状态,或者线程需要是终端状态同时线程是已经处于stop状态
                    // 第二个种情况是因为调用了方法shutdownNow的情况下需要做一个二次检查
                    if ((runStateAtLeast(ctl.get(), STOP) ||
                            (Thread.interrupted() &&
                                    runStateAtLeast(ctl.get(), STOP))) &&
                            !wt.isInterrupted())
                        wt.interrupt();
                    try {
                        // 这个方法是留给子类重写,在执行任务之前做一个处理
                        beforeExecute(wt, task);
                        Throwable thrown = null;
                        try {
                            // 这里只是调用了下任务的run方法,是一个简单的调用,并不是启动线程
                            task.run();
                        } catch (RuntimeException x) {
                            thrown = x; throw x;
                        } catch (Error x) {
                            thrown = x; throw x;
                        } catch (Throwable x) {
                            thrown = x; throw new Error(x);
                        } finally {
                            // 任务执行之后的处理,不论线程是否执行成功
                            afterExecute(task, thrown);
                        }
                    } finally {
                        // 执行成功后的处理
                        task = null;
                        w.completedTasks++;
                        w.unlock();
                    }
                }
                completedAbruptly = false;
            } finally {
                // 当工作线程停止运行之后的处理,会区分是异常退出还是正常退出的
                processWorkerExit(w, completedAbruptly);
            }
    }
    

    再看下getTask方法干了啥。

    private Runnable getTask() {
            // 标记是否已经获取的任务是否为空
            boolean timedOut = false;
    
            for (;;) {
                int c = ctl.get();
                // 获取运行状态
                int rs = runStateOf(c);
    
                // 线程池已经被暂停了,并且线程池处于STOP状态或者队列为空的时候,
                // 返回null值,结束worker的run方法,并且这里进行工作线程数的减一
                if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                    decrementWorkerCount();
                    return null;
                }
    
                int wc = workerCountOf(c);
    
                // 是否可以删除worker
                boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
    
                // 这里也是判断是否可以删除当前工作的线程
                if ((wc > maximumPoolSize || (timed && timedOut))
                        && (wc > 1 || workQueue.isEmpty())) {
                    if (compareAndDecrementWorkerCount(c))
                        return null;
                    continue;
                }
    
                try {
                    // 获取任务,指定超时时间获取或者是阻塞获取。
                    // 这里使用keepAliveTime进行获取,如果在超时时间之内没有获取到,
                    // 就会将timedOut设置为true,然后根据当前工作线程是否为核心线程,
                    // 同时查看下核心线程是否可以超时剔除而进行工作线程的清理工作
                    Runnable r = timed ?
                            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                            workQueue.take();
                    if (r != null)
                        return r;
                    timedOut = true;
                } catch (InterruptedException retry) {
                    timedOut = false;
                }
            }
    }
    

    5.3销毁

    工作线程的销毁,其实已经在上面的方法中体现了。runWorker方法和getTask方法配合进行worker是否需要进行回收的判断。
    1.worker一直会获取task,知道task为空就会推出循环了。所以worker是否会退出的控制在task中呢。
    2.task在什么情况下回返回null呢。
    其一:当线程池的状态大于SHUTDOWN,并且线程已经到了STOP状态或者任务队列为空的时候。
    其二:当线程池的工作线程数大于最大线程数量,或者已经获取过一次任务,并且获取为空,同时可删除当前工作线程。当上面的条件满足之后,还需要满足条件工作线程数大于1,或者队列为空了,体现在代码就是(wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())这个判断。
    还有工作线程销毁之前需要处理的事情:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
            // 如果是异常退出,
            if (completedAbruptly)
                decrementWorkerCount();
    
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 将最终的任务完成数进行叠加
                completedTaskCount += w.completedTasks;
                workers.remove(w);
            } finally {
                mainLock.unlock();
            }
    
            tryTerminate();
    
            int c = ctl.get();
            // 判断当前状态是否是小于STOP状态的,也就是RUNNING和STUTDOWN状态
            if (runStateLessThan(c, STOP)) {
                // 如果是正产完成的
                if (!completedAbruptly) {
                    int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                    // 这种情况是当不存在工作线程但是呢,任务队列不为空的时候,就需要设置最小线程数为1
                    if (min == 0 && ! workQueue.isEmpty())
                        min = 1;
                    // 如果工作线程大于线程池当前可执行的最小线程数min就退出,不需要在进行工作线程的创建了
                    if (workerCountOf(c) >= min)
                        return; 
                }
                // 如果是非正常退出,或者最小工作线程数不够,需要重新创建线程的
                addWorker(null, false);
            }
    }
    

    还有线程池的销毁工作如何进行呢,这里主要是使用shutdown和shutdownNow两个方法。shutdown方法只是不接受新的任务,同时将原有队列中的任务执行完成。只有在线程方法响应中断才能结束线程。这里的响应中断的意思是线程中需要对Thread.interrupted()和Thread.currentThread().isInterrupted()的判断结果进行响应线程退出,否则就算调用shutdown应用程序也不会退出。而shutdownNow相比shutdown方法,多了的只是对队列中的线程的处理。

    public void shutdown() {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 校验是否有暂停权限
                checkShutdownAccess();
                // 将线程池的状态变为SHUTDOWN
                advanceRunState(SHUTDOWN);
                // 这里就是吧所有的工作线程调用了interrupt方法。
                interruptIdleWorkers();
                // 需要子类重写的一个方法,在进行shutDown工作最后需要做的动作
                onShutdown(); // hook for ScheduledThreadPoolExecutor
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
    }
    public List<Runnable> shutdownNow() {
            List<Runnable> tasks;
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 同样校验权限
                checkShutdownAccess();
                // 将线程池的状态直接设置为STOP
                advanceRunState(STOP);
                // 同样终端所有的工作线程
                interruptWorkers();
                // 将任务队列中的任务导出给用户
                tasks = drainQueue();
            } finally {
                mainLock.unlock();
            }
            tryTerminate();
            return tasks;
    }
    

    5.4异常

    线程池的工作线程异常退出之后,由于工作线程可能不够了,所以需要重新创建一些线程,以替补线程池的线程。注意线程本身是没有主工作线程和非主之分的,只是根据核心线程数控制核心线程,所以可能任何线程都有可能成为主工作线程。

    总结

    线程池的工作原理如下:
    1、如果当前运行的线程,少于corePoolSize,则创建一个新的线程来执行任务。
    2、如果运行的线程等于或多于 corePoolSize,将任务加入 BlockingQueue。
    3、如果加入 BlockingQueue 成功,需要二次检查线程池的状态如果线程池没有处于 Running,则从 BlockingQueue 移除任务,启动拒绝策略。
    4、如果线程池处于 Running状态,则检查工作线程(worker)是否为0。如果为0,则创建新的线程来处理任务。如果启动线程数大于maximumPoolSize,任务将被拒绝策略拒绝。
    5、如果加入 BlockingQueue 。失败,则创建新的线程来处理任务。
    6、如果启动线程数大于maximumPoolSize,任务将被拒绝策略拒绝。
    具体流程图可参考如下图:


    image.png

    相关文章

      网友评论

          本文标题:线程池(一)ThreadPoolExecutor

          本文链接:https://www.haomeiwen.com/subject/iuqlvqtx.html