美文网首页
通过这六点,了解Java线程池的全面(总结下篇)

通过这六点,了解Java线程池的全面(总结下篇)

作者: java高级架构F六 | 来源:发表于2019-11-20 16:56 被阅读0次

    五 (重要)ThreadPoolExecutor 使用示例

    我们上面讲解了 Executor框架以及 ThreadPoolExecutor 类,下面让我们实战一下,来通过写一个 ThreadPoolExecutor 的小 Demo 来回顾上面的内容。

    5.1 示例代码:Runnable+ThreadPoolExecutor

    首先创建一个 Runnable 接口的实现类(当然也可以是 Callable 接口,我们上面也说了两者的区别。)

    MyRunnable.java

    import java.util.Date;

    /**

    * 这是一个简单的Runnable类,需要大约5秒钟来执行其任务。

    * @author shuang.kou

    */

    public class MyRunnable implements Runnable {

    private String command;

    public MyRunnable(String s) {

    this.command = s;

    }

    @Override

    public void run() {

    System.out.println(Thread.currentThread().getName() + " Start. Time = " + new Date());

    processCommand();

    System.out.println(Thread.currentThread().getName() + " End. Time = " + new Date());

    }

    private void processCommand() {

    try {

    Thread.sleep(5000);

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    }

    @Override

    public String toString() {

    return this.command;

    }

    }

    编写测试程序,我们这里以阿里巴巴推荐的使用 ThreadPoolExecutor 构造函数自定义参数的方式来创建线程池。

    ThreadPoolExecutorDemo.java

    import java.util.concurrent.ArrayBlockingQueue;

    import java.util.concurrent.ThreadPoolExecutor;

    import java.util.concurrent.TimeUnit;

    public class ThreadPoolExecutorDemo {

    private static final int CORE_POOL_SIZE = 5;

    private static final int MAX_POOL_SIZE = 10;

    private static final int QUEUE_CAPACITY = 100;

    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) {

    //使用阿里巴巴推荐的创建线程池的方式

    //通过ThreadPoolExecutor构造函数自定义参数创建

    ThreadPoolExecutor executor = new ThreadPoolExecutor(

    CORE_POOL_SIZE,

    MAX_POOL_SIZE,

    KEEP_ALIVE_TIME,

    TimeUnit.SECONDS,

    new ArrayBlockingQueue<>(QUEUE_CAPACITY),

    new ThreadPoolExecutor.CallerRunsPolicy());

    for (int i = 0; i < 10; i++) {

    //创建WorkerThread对象(WorkerThread类实现了Runnable 接口)

    Runnable worker = new MyRunnable("" + i);

    //执行Runnable

    executor.execute(worker);

    }

    //终止线程池

    executor.shutdown();

    while (!executor.isTerminated()) {

    }

    System.out.println("Finished all threads");

    }

    }

    可以看到我们上面的代码指定了:

    corePoolSize: 核心线程数为 5。

    maximumPoolSize :最大线程数 10

    keepAliveTime : 等待时间为 1L。

    unit: 等待时间的单位为 TimeUnit.SECONDS。

    workQueue:任务队列为 ArrayBlockingQueue,并且容量为 100;

    handler:饱和策略为 CallerRunsPolicy。

    Output:

    pool-1-thread-2 Start. Time = Tue Nov 12 20:59:44 CST 2019

    pool-1-thread-5 Start. Time = Tue Nov 12 20:59:44 CST 2019

    pool-1-thread-4 Start. Time = Tue Nov 12 20:59:44 CST 2019

    pool-1-thread-1 Start. Time = Tue Nov 12 20:59:44 CST 2019

    pool-1-thread-3 Start. Time = Tue Nov 12 20:59:44 CST 2019

    pool-1-thread-5 End. Time = Tue Nov 12 20:59:49 CST 2019

    pool-1-thread-3 End. Time = Tue Nov 12 20:59:49 CST 2019

    pool-1-thread-2 End. Time = Tue Nov 12 20:59:49 CST 2019

    pool-1-thread-4 End. Time = Tue Nov 12 20:59:49 CST 2019

    pool-1-thread-1 End. Time = Tue Nov 12 20:59:49 CST 2019

    pool-1-thread-2 Start. Time = Tue Nov 12 20:59:49 CST 2019

    pool-1-thread-1 Start. Time = Tue Nov 12 20:59:49 CST 2019

    pool-1-thread-4 Start. Time = Tue Nov 12 20:59:49 CST 2019

    pool-1-thread-3 Start. Time = Tue Nov 12 20:59:49 CST 2019

    pool-1-thread-5 Start. Time = Tue Nov 12 20:59:49 CST 2019

    pool-1-thread-2 End. Time = Tue Nov 12 20:59:54 CST 2019

    pool-1-thread-3 End. Time = Tue Nov 12 20:59:54 CST 2019

    pool-1-thread-4 End. Time = Tue Nov 12 20:59:54 CST 2019

    pool-1-thread-5 End. Time = Tue Nov 12 20:59:54 CST 2019

    pool-1-thread-1 End. Time = Tue Nov 12 20:59:54 CST 2019

    5.2 线程池原理分析

    承接 5.1 节,我们通过代码输出结果可以看出:线程池每次会同时执行 5 个任务,这 5 个任务执行完之后,剩余的 5 个任务才会被执行。 大家可以先通过上面讲解的内容,分析一下到底是咋回事?(自己独立思考一会)

    现在,我们就分析上面的输出内容来简单分析一下线程池原理。

    **为了搞懂线程池的原理,我们需要首先分析一下 execute方法。**在 5.1 节中的 Demo 中我们使用 executor.execute(worker)来提交一个任务到线程池中去,这个方法非常重要,下面我们来看看它的源码:

    // 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static int workerCountOf(int c) {

    return c & CAPACITY;

    }

    private final BlockingQueue<Runnable> workQueue;

    public void execute(Runnable command) {

    // 如果任务为null,则抛出异常。

    if (command == null)

    throw new NullPointerException();

    // ctl 中保存的线程池当前的一些状态信息

    int c = ctl.get();

    // 下面会涉及到 3 步 操作

    // 1.首先判断当前线程池中之行的任务数量是否小于 corePoolSize

    // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。

    if (workerCountOf(c) < corePoolSize) {

    if (addWorker(command, true))

    return;

    c = ctl.get();

    }

    // 2.如果当前之行的任务数量大于等于 corePoolSize 的时候就会走到这里

    // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态才会被并且队列可以加入任务,该任务才会被加入进去

    if (isRunning(c) && workQueue.offer(command)) {

    int recheck = ctl.get();

    // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。

    if (!isRunning(recheck) && remove(command))

    reject(command);

    // 如果当前线程池为空就新创建一个线程并执行。

    else if (workerCountOf(recheck) == 0)

    addWorker(null, false);

    }

    //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。

    //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。

    else if (!addWorker(command, false))

    reject(command);

    }

    通过下图可以更好的对上面这 3 步做一个展示

    ,现在,让我们在回到 5.1 节我们写的 Demo, 现在应该是不是很容易就可以搞懂它的原理了呢?

    没搞懂的话,也没关系,可以看看我的分析:

    我们在代码中模拟了 10 个任务,我们配置的核心线程数为 5 、等待队列容量为 100 ,所以每次只可能存在 5 个任务同时执行,剩下的 5 个任务会被放到等待队列中去。当前的 5 个任务之行完成后,才会之行剩下的 5 个任务。

    5.3 几个常见的对比

    5.3.1 Runnable vs Callable

    Runnable自 Java 1.0 以来一直存在,但Callable仅在 Java 1.5 中引入,目的就是为了来处理Runnable不支持的用例。Runnable 接口不会返回结果或抛出检查异常,但是**Callable 接口**可以。所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁。

    工具类 Executors 可以实现 Runnable 对象和 Callable 对象之间的相互转换。(Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule))。

    Runnable.java

    @FunctionalInterface

    public interface Runnable {

    /**

    * 被线程执行,没有返回值也无法抛出异常

    */

    public abstract void run();

    }

    Callable.java

    @FunctionalInterface

    public interface Callable<V> {

    /**

    * 计算结果,或在无法这样做时抛出异常。

    * @return 计算得出的结果

    * @throws 如果无法计算结果,则抛出异常

    */

    V call() throws Exception;

    }

    5.3.2 execute() vs submit()

    execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功与否;

    submit()方法用于提交需要返回值的任务。线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

    我们以**AbstractExecutorService**接口中的一个 submit 方法为例子来看看源代码:

    public Future<?> submit(Runnable task) {

    if (task == null) throw new NullPointerException();

    RunnableFuture<Void> ftask = newTaskFor(task, null);

    execute(ftask);

    return ftask;

    }

    上面方法调用的 newTaskFor 方法返回了一个 FutureTask 对象。

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {

    return new FutureTask<T>(runnable, value);

    }

    我们再来看看execute()方法:

    public void execute(Runnable command) {

    ...

    }

    5.3.3 shutdown()VSshutdownNow()

    shutdown() :关闭线程池,线程池的状态变为 SHUTDOWN。线程池不再接受新任务了,但是队列里的任务得执行完毕。

    shutdownNow() :关闭线程池,线程的状态变为 STOP。线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。

    5.3.2 isTerminated() VS isShutdown()

    isShutDown 当调用 shutdown() 方法后返回为 true。

    isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true

    5.4 加餐:Callable+ThreadPoolExecutor示例代码

    MyCallable.java

    import java.util.concurrent.Callable;

    public class MyCallable implements Callable<String> {

    @Override

    public String call() throws Exception {

    Thread.sleep(1000);

    //返回执行当前 Callable 的线程名字

    return Thread.currentThread().getName();

    }

    }

    CallableDemo.java

    import java.util.ArrayList;

    import java.util.Date;

    import java.util.List;

    import java.util.concurrent.ArrayBlockingQueue;

    import java.util.concurrent.Callable;

    import java.util.concurrent.ExecutionException;

    import java.util.concurrent.Future;

    import java.util.concurrent.ThreadPoolExecutor;

    import java.util.concurrent.TimeUnit;

    public class CallableDemo {

    private static final int CORE_POOL_SIZE = 5;

    private static final int MAX_POOL_SIZE = 10;

    private static final int QUEUE_CAPACITY = 100;

    private static final Long KEEP_ALIVE_TIME = 1L;

    public static void main(String[] args) {

    //使用阿里巴巴推荐的创建线程池的方式

    //通过ThreadPoolExecutor构造函数自定义参数创建

    ThreadPoolExecutor executor = new ThreadPoolExecutor(

    CORE_POOL_SIZE,

    MAX_POOL_SIZE,

    KEEP_ALIVE_TIME,

    TimeUnit.SECONDS,

    new ArrayBlockingQueue<>(QUEUE_CAPACITY),

    new ThreadPoolExecutor.CallerRunsPolicy());

    List<Future<String>> futureList = new ArrayList<>();

    Callable<String> callable = new MyCallable();

    for (int i = 0; i < 10; i++) {

    //提交任务到线程池

    Future<String> future = executor.submit(callable);

    //将返回值 future 添加到 list,我们可以通过 future 获得 执行 Callable 得到的返回值

    futureList.add(future);

    }

    for (Future<String> fut : futureList) {

    try {

    System.out.println(new Date() + "::" + fut.get());

    } catch (InterruptedException | ExecutionException e) {

    e.printStackTrace();

    }

    }

    //关闭线程池

    executor.shutdown();

    }

    }

    Output:

    Wed Nov 13 13:40:41 CST 2019::pool-1-thread-1

    Wed Nov 13 13:40:42 CST 2019::pool-1-thread-2

    Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3

    Wed Nov 13 13:40:42 CST 2019::pool-1-thread-4

    Wed Nov 13 13:40:42 CST 2019::pool-1-thread-5

    Wed Nov 13 13:40:42 CST 2019::pool-1-thread-3

    Wed Nov 13 13:40:43 CST 2019::pool-1-thread-2

    Wed Nov 13 13:40:43 CST 2019::pool-1-thread-1

    Wed Nov 13 13:40:43 CST 2019::pool-1-thread-4

    Wed Nov 13 13:40:43 CST 2019::pool-1-thread-5

    六 ScheduledThreadPoolExecutor 详解

    ScheduledThreadPoolExecutor 主要用来在给定的延迟后运行任务,或者定期执行任务。这个在实际项目中基本不会被用到,所以对这部分大家只需要简单了解一下它的思想。关于如何在Spring Boot 中 实现定时任务,可以查看这篇文章《5分钟搞懂如何在Spring Boot中Schedule Tasks》。

    6.1 简介

    ScheduledThreadPoolExecutor 使用的任务队列 DelayQueue 封装了一个 PriorityQueue,PriorityQueue 会对队列中的任务进行排序,执行所需时间短的放在前面先被执行(ScheduledFutureTask 的 time 变量小的先执行),如果执行所需时间相同则先提交的任务将被先执行(ScheduledFutureTask 的 squenceNumber 变量小的先执行)。

    ScheduledThreadPoolExecutor 和 Timer 的比较:

    Timer 对系统时钟的变化敏感,ScheduledThreadPoolExecutor不是;

    Timer 只有一个执行线程,因此长时间运行的任务可以延迟其他任务。 ScheduledThreadPoolExecutor 可以配置任意数量的线程。 此外,如果你想(通过提供 ThreadFactory),你可以完全控制创建的线程;

    在TimerTask 中抛出的运行时异常会杀死一个线程,从而导致 Timer 死机:-( ...即计划任务将不再运行。ScheduledThreadExecutor 不仅捕获运行时异常,还允许您在需要时处理它们(通过重写 afterExecute 方法ThreadPoolExecutor)。抛出异常的任务将被取消,但其他任务将继续运行。

    综上,在 JDK1.5 之后,你没有理由再使用 Timer 进行任务调度了。

    备注: Quartz 是一个由 java 编写的任务调度库,由 OpenSymphony 组织开源出来。在实际项目开发中使用 Quartz 的还是居多,比较推荐使用 Quartz。因为 Quartz 理论上能够同时对上万个任务进行调度,拥有丰富的功能特性,包括任务调度、任务持久化、可集群化、插件等等。

    6.2 运行机制

    ScheduledThreadPoolExecutor 的执行主要分为两大部分:

    当调用 ScheduledThreadPoolExecutor 的 scheduleAtFixedRate() 方法或者**scheduleWirhFixedDelay()** 方法时,会向 ScheduledThreadPoolExecutor 的 DelayQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask 。

    线程池中的线程从 DelayQueue 中获取 ScheduledFutureTask,然后执行任务。

    ScheduledThreadPoolExecutor 为了实现周期性的执行任务,对 ThreadPoolExecutor 做了如下修改:

    使用 DelayQueue 作为任务队列;

    获取任务的方不同

    执行周期任务后,增加了额外的处理

    6.3 ScheduledThreadPoolExecutor 执行周期任务的步骤

    线程 1 从 DelayQueue 中获取已到期的 ScheduledFutureTask(DelayQueue.take())。到期任务是指 ScheduledFutureTask 的 time 大于等于当前系统的时间;

    线程 1 执行这个 ScheduledFutureTask;

    线程 1 修改 ScheduledFutureTask 的 time 变量为下次将要被执行的时间;

    线程 1 把这个修改 time 之后的 ScheduledFutureTask 放回 DelayQueue 中(DelayQueue.add())。

    大家记得关注主页(上篇)

    相关文章

      网友评论

          本文标题:通过这六点,了解Java线程池的全面(总结下篇)

          本文链接:https://www.haomeiwen.com/subject/craoictx.html