美文网首页
源码修炼笔记之线程池ThreadPoolExecutor详解

源码修炼笔记之线程池ThreadPoolExecutor详解

作者: 花醉霜寒 | 来源:发表于2020-04-06 17:09 被阅读0次

线程池

\color{green}{为什么需要线程池}

多线程编程过程中,用new Thread()的方式需要频繁的创建和销毁线程,线程的创建存在系统调用clone(),线程的创建和销毁会带来一定的系统开销,如果频繁的创建和销毁线程会造成性能问题。

\color{green}{线程池的作用}

线程池顾名思义就是装有线程的池子,线程池的作用包括提供提供线程资源和线程资源的管理。

线程池任务处理流程

\color{green}{ThreadPoolExecutor}

ThreadPoolExecutor是Java中线程池的核心类,首先了解一下ThreadPoolExecutor的构造函数

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • corePoolSize: 核心线程数大小
  • maximumPoolSize: 最大线程数
  • keepAliveTime: 非核心线程最大空闲时间
  • TimeUnit unit: 时间单位,默认秒
  • BlockingQueue<Runnable> workQueue: 存储任务的阻塞队列
  • ThreadFactory threadFactory: 线程工厂,默认实用默认线程工厂,可以自定义
  • RejectedExecutionHandler handler: 拒绝策略,当线程池达到最大任务处理数时,对于新来的任务的处理策略

\color{green}{线程池基本用法}

ThreadPoolExecutor executor = new ThreadPoolExecutor(4,8,30, TimeUnit.SECONDS, new 
                ArrayBlockingQueue<>(100));
executor.execute(() -> System.out.println("do something!"));

定义了一个核心线程数为4,最大线程数为8,线程最大空闲时间为30秒,阻塞队列选择ArrayBlockingQueue容量设置为100,没有指定线程工厂和拒绝策略,会采用默认的线程工厂(Executors类提供的DefaultThreadFactory)和默认的拒绝策略抛出异常。线程池创建完毕之后通过execute方法将任务交给线程池处理。

\color{green}{线程池工作流程}

上面提到通过execute方法将任务交给线程池处理,我们通过execute方法来了解线程池的工作流程,execute方法的源码如下所示:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //获取当前线程数
        int c = ctl.get();
        //若当前线程数小于核心线程数,则创建新的线程来处理任务
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //若当前线程数大于核心线程数,则讲任务放入指定的阻塞队列中
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果队列塞满之后,开启非核心线程处理新任务
        else if (!addWorker(command, false))
            reject(command);
    }

线程池的工作流程:

  • 新的任务进来首先比较当前线程数与核心线程数的大小,如果小于核心线程数则创建线程 处理任务;
  • 如果当前线程数不小于核心线程数,则将任务放入阻塞队列中排队;
  • 如果阻塞队列中的任务达到最大任务个数,则创建非核心线程来处理任务;
  • 如果达到最大线程数,则通过拒绝策略来拒绝任务。

线程池源码解析

\color{green}{核心源码解析}

前面提到线程池的作用是提供线程资源以及线程的管理,那么ThreadPoolExecutor是如何来提供线程资源和管理这些线程的呢?
答案就是ThreadPoolExecutor的一个重要的内部类Worker类,Worker类继承了AQS实现了Runnable接口,它有三个成员变量:

//用于执行任务的线程,通过ThreadFactory创建
final Thread thread;
//初始任务
Runnable firstTask;
//已完成的任务个数
volatile long completedTasks;
//worker构造函数
Worker(Runnable firstTask) {
    setState(-1); 
    this.firstTask = firstTask;
    //通过线程工厂获取线程
    this.thread = getThreadFactory().newThread(this);
}

run方法实际是调用ThreadPoolExecutor类的runWorker方法,ThreadPoolExecutor通过HashSet<Worker>来维护这些worker,其实线程池中的线程就是worker中的thread,有多少个worker就意味着线程池有多少个线程。线程池通过addWorker方法添加worker,核心代码如下所示,其大致流程是先创建worker,worker包含一个线程,然后将新建的worker交给ThreadPoolExecutor的workers,然后启动线程。

        try {
            //创建worker,这里相当于新开了一个线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //将创建的线程交给ThreadPoolExecutor
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //线程启动
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }

Worker类实现了Runnable接口,run方法中调用ThreadPoolExecutor类的runWorker方法,源码如下所示:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果firstTask为空,则通过getTask()获取任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //执行任务
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

逻辑比较简单,在while循环中不断获取任务并执行,注意两个方法:beforeExecute和afterExecute,这两个方法在ThreadPoolExecutor中没有实现,是两个空方法,可用通过继承ThreadPoolExecutor类来重写这两个方法来实现一些任务处理之前和任务处理之后的业务逻辑,当无法获取任务时,跳出while循环,最终执行processWorkerExit方法,这是个比较重要的方法,线程池中线程的释放就是通过这个方法完成的,其源码如下所示:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

其处理的大致流程是,首先尝试获取mainLock,获取锁之后从workers中将当前的worker移除,此时相当于线程池中的线程数减少1个,然后判断allowCoreThreadTimeOut,该参数表示是否允许核心线程超过最大空闲时间被销毁,从代码中可以看出不管该参数的值是true还是false,线程池中线程销毁之后,至少都存在一个线程。

\color{green}{线程池的拒绝策略}

当线程数和队列中的任务数都到达设置的最大值的时候,线程池会对新的任务执行拒绝策略,ThreadPoolExecutor提供了四种拒绝策略可供我们选择:

  • CallerRunsPolicy: 将任务交给主线程来执行;
  • AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常;
  • DiscardPolicy: 默默的丢弃任务不做任何处理;
  • DiscardOldestPolicy: 丢弃尚未执行的最老的任务,执行新的任务。
    如果这些拒绝策略无法满足平时的业务需求,我们可以实现RejectedExecutionHandler接口,按照具体需求实现自己的拒绝策略。

\color{green}{线程池中的线程安全问题}
ThreadPoolExecutor中存在很多成员变量,例如核心线程数,最大线程数以及存放Worker的workers等,多线程情况如何保证这些共享变量操作的线程安全性,线程池采用了如下三种方法:

  • ThreadPoolExecutor类中的mainLock,对于mainLock锁的竞争主要来自与worker的添加和worker的删除,即线程的创建和销毁,向workers中添加新的worker或者向队列中添加新的任务的时候会持有该锁,道理很简单,就是当有新的任务来的时候,显然和删除线程的操作是互斥的,此时线程不能被销毁,如果没有这把锁的保护,线程池在某些情况下会出现频繁的创建和销毁线程的情况;
  • volatile: ThreadPoolExecutor中的很多参数都是用volatile来修饰的,volatile是用来保证共享变量的可见性的,ThreadPoolExecutor提供了对于核心参数的set方法,就是说在线程池工作的过程中可以改变线程池的核心参数,这些核心参数的修改如何保证可见性,就要通过volatile了;
  • Worker类继承了AQS,通过继承AQS来保证worker相关操作的线程安全性。

源码修炼系列
源码修炼笔记之线程池ThreadPoolExecutor详解
源码修炼笔记之HashMap源码解析

相关文章

网友评论

      本文标题:源码修炼笔记之线程池ThreadPoolExecutor详解

      本文链接:https://www.haomeiwen.com/subject/vuhbphtx.html