美文网首页
第八章——线程池的使用

第八章——线程池的使用

作者: 你可记得叫安可 | 来源:发表于2020-11-05 20:40 被阅读0次

    8.3 配置 ThreadPoolExecutor

    ThreadPoolExecutor 为一些 Executor 提供了基本的实现,这些 Executor 是由 Executors 中的 newCachedThreadPoolnewFixedThreadPoolnewScheduledThreadExecutor 等工厂方法返回的。

    如果默认的执行策略不能满足需求,那么可以通过 ThreadPoolExecutor 的构造函数来实例化一个对象,并根据自己的需求来定制。

    // 程序清单 8-2
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {}
    

    线程池的基本大小(Core Pool Size)、最大大小(Maximum Pool Size)以及存活时间等因素共同负责线程的创建与销毁。基本大小也就是线程池的目标大小,记载没有任务执行时①线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。线程池的最大大小表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。

    ① 在创建 ThreadPoolExecutor 初期,线程并不会立即启动,而是等到有任务提交时才会启动,除非调用 prestartAllCoreThreads

    newFixedThreadPool 工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。
    newCachedThreadPool 工厂方法将线程池的最大大小设置为 Integer.MAX_VALUE,而将基本大小设置为零,并将超时设置为 1 分钟,这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩。

    8.3.2 管理队列任务

    ThreadPoolExecutor 允许提供一个 BlockingQueue 来保存等待执行的任务。基本的任务排队方法有 3 种:无界队列、有界队列和同步移交(Synchronous Handoff)。队列的选择与其他的配置参数有关,例如线程池的大小等。

    newFixedThreadPoolnewSingleThreadExecutor 在默认情况下将使用一个无界的 LinkedBlockingQueue。如果所有工作者线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理它们的速度,那么队列将无限制地增加。

    一种更稳妥的资源管理策略是使用有界队列,例如 ArrayBlockingQueue、有界的 LinkedBlockingQueuePriorityBlockingQueue。有界队列有助于避免资源耗尽的情况发生,但它又带来了新的问题:当队列填满后,新的任务该怎么办?下一节的饱和策略将讨论解决这个问题。

    对于非常大或者无界的线程池,可以通过使用 SynchronousQueue 来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue 不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入 SynchronousQueue 中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么 ThreadPoolExecutor 将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是首先放在队列中,然后由工作者线程从队列中提取任务。只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue 才有实际价值。在 newCachedThreadPool 工厂方法中就使用了 SynchronousQueue

    8.3.3 饱和策略

    当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor 的饱和策略可以通过调用 setRejectedExecutionHandler 来修改。(如果某个任务被提交到一个已被关闭的 Executor 时,也会用到饱和策略)JDK 提供了几种不同的 RejectedExecutionHandler 实现,每种实现都包含由不同的饱和策略:AbortPolicyCallerRunsPolicyDiscardPolicyDiscardOldestPolicy

    “中止(Abort)” 策略是默认的饱和策略,该策略将抛出未检查的 RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。

    当新提交的任务无法保存到队列中等待执行时,“抛弃(Discard)”策略会悄悄抛弃该任务。

    “抛弃最旧的(Discard-Oldest)”策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。

    “调用者运行(Caller-Runs)”策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者。它不会再线程池的某个线程中执行新提交的任务,而是在一个调用了 execute 的线程中执行该任务(就是调用者线程)。由于执行任务需要一定时间,因此主线程至少在一段时间内不能提交任务任务,从而使得工作者线程有时间来处理完正在执行的任务。

    当工作队列被填满后,饱和策略并不能阻塞 execute 的提交。此时,我们可以通过使用 Semaphore 来限制任务的到达率。下面的 程序清单 8-4 使用了一个无界队列(因为不能限制队列的大小和任务的到达率),并设置信号量的上界,设置为线程池的大小加上可排队任务的数量。

    // 程序清单 8-4
    public class BoundedExecutor {
        private final Executor exec;
        private final Semaphore semaphore;
        
        public BoundedExecutor(Executor exec, int bound) {
            this.exec = exec;
            this.semaphore = new Semaphore(bound);
        }
        
        public void submitTask(final Runnable command) throws InterruptedException {
            semaphore.acquire();  // 到达上限后,任务提交将在这里被阻塞
            try {
                exec.execute(() -> {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                });
            } catch (RejectedExecutionException e) {
                semaphore.release();
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:第八章——线程池的使用

          本文链接:https://www.haomeiwen.com/subject/imkwvktx.html