美文网首页技术杂谈
Java线程池分析

Java线程池分析

作者: 香芋牛奶面包 | 来源:发表于2019-02-16 16:27 被阅读1次

引言

在并发编程中,我们经常会使用到线程池,当然我们也可以手动一个一个创建线程,那么为何我们还是推崇大家使用线程池进行并发编程呢?借用《Java并发编程的艺术》提到的来说一下使用线程池的优点有3个

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

ThreadPoolExecutor类

在日常使用中,大多数情况我们都会使用JDK提供的Executors去创建线程池,Executors利用工厂模式向我们提供了4种线程池实现方式,但是最新的阿里Java开发手册中明确说明了不建议使用Executors去创建线程池,原因是使用Executors创建线程池会使用很多默认值,默认使用的参数有时候是不合理,但是开发者往往会忽略。所以我们应该尽量使用ThreadPoolExecutor类来显示的创建线程池。

下面我们先来看下ThreadPoolExecutor类的继承结构

image.png
  • Executor接口只含有一个execute(Runnable command)方法,加入一个新的线程
  • ExecutorService接口相比Executor多个几个方法
public interface ExecutorService extends Executor {
   void shutdown();
   boolean isShutdown();
   boolean isTerminated();
   boolean awaitTermination(long timeout, TimeUnit unit)
       throws InterruptedException;
   <T> Future<T> submit(Callable<T> task);
   <T> Future<T> submit(Runnable task, T result);
   Future<?> submit(Runnable task);
   <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
       throws InterruptedException;
   <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                 long timeout, TimeUnit unit)
       throws InterruptedException;

   <T> T invokeAny(Collection<? extends Callable<T>> tasks)
       throws InterruptedException, ExecutionException;
   <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                   long timeout, TimeUnit unit)
       throws InterruptedException, ExecutionException, TimeoutException;
}

我们直接看ThreadPoolExecutor所提供的构造方法

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

现在我们来解释一下各个参数的含义

  • corePoolSize 核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系
  • maximumPoolSize 线程池最大线程数
  • keepAliveTime 空闲线程的存活时间,默认只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用
  • unit 参数keepAliveTime的时间单位
  • workQueue 一个阻塞队列,用来存储等待执行的任务
  • threadFactory 线程工厂,主要用来创建线程
  • handler 表示当拒绝处理任务时的策略,有以下四种取值
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

下面再简单再介绍一下ThreadPoolExecutor中几个重要的方法

  • execute 这个方法其实就是对Executor接口的实现,也是最核心的方法,用于向线程池中提交一个任务执行
  • submit 该方法也是用于向线程池里提交任务,区别在于submit提交可以有返回值,会返回一个Futrue类型来异步获取执行结果
  • shutdown 用于关闭线程池
  • shutdownNow 也是关闭线程池,与shutdown区别在于shutdownNow会尝试关闭正在执行的线程任务

通过源码分析 ThreadPoolExecutor

上文我们简单的分析了ThreadPoolExecutor,下面我们将根据源码来进一步分析ThreadPoolExecutor核心功能的实现

先从其核心方法execute开始解读

public void execute(Runnable command) {
      // 如果传入空对象 则抛出空指针异常
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    
    /**
     * 如果当前正在执行任务的线程数小于 corePoolSize
     */
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    /**
     * 如果当前正在执行任务的线程数大于corePoolSize,且workQueue未满
     */
    if (isRunning(c) && workQueue.offer(command)) {
            // 重复检查一次,防止正好此时线程池被shutdown
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    /**
     * 如果当前正在执行的任务线程数大于corePoolSize,且workQueue已满,则使用        * maximumPoolSize参数创建线程,如果已经到了maximumPoolSize最大值,则启用拒绝      * 策略
     */
    else if (!addWorker(command, false))
        reject(command);
}

上面的代码简单总结一下就是

  1. 当前正在执行任务的线程数小于corePoolSize时,正常加入执行
  2. 当前正在执行任务的线程数大于corePoolSize,且workQueue未满时,则将任务加入等待队列workQueue
  3. 如果当前正在执行的任务线程数大于corePoolSize,且workQueue已满,则会临时扩充线程数,根据maximumPoolSize最大线程数值
  4. 如果当前正在执行的任务线程数大于maximumPoolSize 这时已经超出最大可承受的线程数值了,会启用拒绝策略,也就是上文所配置的四种策略之一,默认是抛出异常,加入任务失败

workQueue 任务队列

上文有提到任务队列,也就是在等待执行的任务队列。workQueue的本质是一种阻塞队列BlockingQueue<Runnable>,列举三种常用类型

  1. 基于数组的先进先出队列,此队列创建时必须指定大小
  2. 基于链表的先进先出队列
  3. 同步队列 该队列不存储元素,每个插入操作必须等待另一个线程调用移除操作,否则插入操作会一直阻塞

核心功能分析

上文中我们通过查看源码知道了新任务加入线程池的策略,下面我们继续往下看,再分析之前,我们可以先思考几个问题

  1. 任务等待队列是在何时被执行?
  2. 线程是如何实现重复利用的?,毕竟这是线程池最重要的功能了
  3. 空闲线程根据keepAliveTime参数是在哪里被回收的?

接下来我们继续通过源码来解读这三个问题
根据上文execute方法源码,可以看到调用了addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 这里判断当前线程数是否大于最大值,大于了则返回false
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
            // 新建了一个 Worker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            // 加锁
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 把当前这个工作线程加入到 workers 集合中
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                  // 开始执行此worker
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
  • 根据参数core 判断当前线程数是否已经大于corePoolSizemaximumPoolSize,大于则直接返回false
  • 初始化了一个Worker对象,并且加入到了workers集合中,workers是一个Set类型的集合
  • 启动当前这个Worker线程任务

经过上面的分析,可能大概会有疑惑,Worker对象是做啥的?下面我们就来看下Worker类的实现

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    /**
     * Creates with given first task and thread from ThreadFactory.
     * @param firstTask the first task (null if none)
     */
    Worker(Runnable firstTask) {
        setState(-1); // inhibit interrupts until runWorker
        this.firstTask = firstTask;
        this.thread = getThreadFactory().newThread(this);
    }

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
    
    ...
}
  • Worker实现了Runnable接口
  • 构造方法中通过ThreadFactory初始化了一个新的线程对象
  • run()方法调用了ThreadPoolExecutor的runWorker()

重点在runWorker方法中

线程池重复利用机制

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 获取第一个任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
            // 从任务队列中循环获取任务执行 getTask方法有可能会阻塞
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                  // 空实现,可以通过继承`ThreadPoolExecutor`重写此方法
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                        // 执行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

代码解析到这里,基本可以回答上文所提出的三个问题了

  • 加入到任务队列的任务会在这里被执行,因为Worker对象执行完一个任务后,并不会立刻结束,而是会通过循环调用getTask()方法从任务队列中获取最新任务来执行,这样子就实现了线程的重复利用,而不必每次都重新创建一个Worker对象
  • getTask()方法无法获取到最新的任务后,该Worker线程才会被回收。所以第三个问题keepAliveTime的机制必然是在getTask()中实现的

getTask 超时策略

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // workers大于corePoolSize,或则允许corePoolSize设置空闲超时时间
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            // 当前线程数已经大于maximumPoolSize或则已经超时过一次,则直接返回null
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
                 // 获取任务,如果timed为true,则等待一定时间(keepAliveTime)未返回的话,会返回null,如果timed未设置为true,则会一直阻塞,直到有数据
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
  • 先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null
  • 判断当前线程池的线程数是否已经大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用workQueue.poll(time,timeUnit)来取任务,这个方法会阻塞等待一定的时间,如果取不到任务就返回null,否则则会调用workQueue.take()来取任务,这个方法会wait释放CPU一直阻塞。

总结一下,只有当我们设置allowCoreThreadTimeOut(核心池线程空闲超时时间)或则当前线程数大于corePoolSize时,keepAliveTime机制才会生效

整个流程到这里大概就分析完了,下图基本绘制了线程池提交任务,执行任务的整个流程

image.png

Executors 中几个常用的线程池

虽然阿里Java开发规约中不建议我们使用Executors类直接创建线程池,但还是有必要简单介绍几个其中常用的方法

  • Executors.newCachedThreadPool() 创建一个缓冲池,缓冲池容量大小为Integer.MAX_VALUE. 将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUEworkQueue使用的SynchronousQueue,也就是说来了任务就创建线程运行,当线程空闲超过60秒,就销毁线程
  • Executors.newSingleThreadExecutor() 创建容量为1的缓冲池,将corePoolSizemaximumPoolSize都设置为1,workQueue使用的LinkedBlockingQueue
  • Executors.newFixedThreadPool(int) 创建固定容量大小的缓冲池,空闲线程不会销毁。corePoolSizemaximumPoolSize值是相等的,workQueue使用的是LinkedBlockingQueue

总结

本篇文章主要从对线程池的配置使用,以及源码实现做了分析,总体上比较全面的分析了Java线程池的实现。

博客原文地址戳这里

相关文章

  • 分析jdk-1.8-ForkJoinPool实现原理(上)

    Java并发编程源码分析系列: 分析Java线程池的创建 分析Java线程池执行原理 分析Java线程池Calla...

  • 分析jdk-1.8-ForkJoinPool实现原理(下)

    Java并发编程源码分析系列: 分析Java线程池的创建 分析Java线程池执行原理 分析Java线程池Calla...

  • 分析ReentrantLock的实现原理

    Java并发编程源码分析系列: 分析Java线程池的创建 分析Java线程池执行原理 分析Java线程池Calla...

  • 分析CountDownLatch的实现原理

    Java并发编程源码分析系列: 分析Java线程池的创建 分析Java线程池执行原理 分析Java线程池Calla...

  • 分析Java线程池Callable任务执行原理

    Java并发编程源码分析系列: 分析Java线程池的创建 分析Java线程池执行原理 上一篇分析了线程池的执行原理...

  • Java线程池

    Java线程池分析。 创建线程池 构造函数: 参数分析: corePoolSize : 核心线程数 maximum...

  • 分析Java线程池执行原理

    Java并发编程源码分析系列: 分析Java线程池的创建 上一篇已经对线程池的创建进行了分析,了解线程池既有预设的...

  • 面试题2019年7月

    线程池原理 参考:Java 线程池原理分析 线程池工作原理:1、线程数量小于 corePoolSize,直接创建新...

  • 线程池

    线程池的文章:JDK线程池(一):体系结构JDK线程池(二):ThreadPoolExecutor深入分析java...

  • Java线程池总结

    本篇文章讲述Java中的线程池问题,同样适用于Android中的线程池使用。本篇文章参考:Java线程池分析,Ja...

网友评论

    本文标题:Java线程池分析

    本文链接:https://www.haomeiwen.com/subject/abqieqtx.html