美文网首页程序员
掌握JDK21全新结构化并发编程,轻松提升开发效率!

掌握JDK21全新结构化并发编程,轻松提升开发效率!

作者: JavaEdge | 来源:发表于2023-08-21 16:41 被阅读0次

    1 概要

    通过引入结构化并发编程的API,简化并发编程。结构化并发将在不同线程中运行的相关任务组视为单个工作单元,从而简化错误处理和取消操作,提高可靠性,并增强可观察性。这是一个预览版的API。

    2 历史

    结构化并发是由JEP 428提出的,并在JDK 19中作为孵化API发布。它在JDK 20中被JEP 437重新孵化,通过对作用域值(JEP 429)进行轻微更新。

    我们在这里提议将结构化并发作为JUC包中的预览API。唯一重要变化是StructuredTaskScope::fork(...)方法返回一个[子任务],而不是一个Future,如下面所讨论的。

    3 目标

    推广一种并发编程风格,可以消除由于取消和关闭而产生的常见风险,如线程泄漏和取消延迟。

    提高并发代码的可观察性。

    4 非目标

    不替换JUC包中的任何并发构造,如ExecutorService和Future。

    不定义Java平台的最终结构化并发API。其他结构化并发构造可以由第三方库定义,或在未来的JDK版本中定义。

    不定义在线程之间共享数据流的方法(即通道)。会在未来提出这样做。

    不用新的线程取消机制替换现有的线程中断机制。会在未来提出这样做。

    5 动机

    开发人员通过将任务分解为多个子任务来管理复杂性。在普通的单线程代码中,子任务按顺序执行。然而,如果子任务彼此足够独立,并且存在足够的硬件资源,那么通过在不同线程中并发执行子任务,可以使整个任务运行得更快(即具有较低的延迟)。例如,将多个I/O操作的结果组合成一个任务,如果每个I/O操作都在自己的线程中并发执行,那么任务将运行得更快。虚拟线程(JEP 444)使得为每个此类I/O操作分配一个线程成为一种具有成本效益的方法,但是管理可能会产生大量线程仍然是一个挑战。

    6 ExecutorService 非结构化并发

    java.util.concurrent.ExecutorService API 是在 Java 5 中引入的,它帮助开发人员以并发方式执行子任务。

    如下 handle() 的方法,它表示服务器应用程序中的一个任务。它通过将两个子任务提交给 ExecutorService 来处理传入的请求。

    ExecutorService 立即返回每个子任务的 Future,并根据 Executor 的调度策略同时执行这些子任务。handle() 方法通过阻塞调用它们的 Futureget() 方法来等待子任务的结果,因此该任务被称为加入了其子任务。

    Response handle() throws ExecutionException, InterruptedException {
        Future<String> user = esvc.submit(() -> findUser());
        Future<Integer> order = esvc.submit(() -> fetchOrder());
        String theUser = user.get();   // 加入 findUser
        int theOrder = order.get();    // 加入 fetchOrder
        return new Response(theUser, theOrder);
    }
    

    由于子任务并发执行,每个子任务都可独立地成功或失败。在这个上下文中,"失败" 意味着抛出异常。通常,像 handle() 这样的任务应该在任何一个子任务失败时失败。当出现失败时,理解线程的生命周期会变得非常复杂:

    • findUser() 抛异常,那么调用 user.get()handle() 也会抛出异常,但是 fetchOrder() 会继续在自己的线程中运行。这是线程泄漏,最好情况下浪费资源,最坏情况下 fetchOrder() 的线程可能会干扰其他任务。

    • 如执行 handle() 的线程被中断,这个中断不会传播到子任务。findUser()fetchOrder() 的线程都会泄漏,即使在 handle() 失败后仍然继续运行。

    • 如果 findUser() 执行时间很长,但是在此期间 fetchOrder() 失败,那么 handle() 将不必要地等待 findUser(),因为它会在 user.get() 上阻塞,而不是取消它。只有在 findUser() 完成并且 user.get() 返回后,order.get() 才会抛出异常,导致 handle() 失败。

    每种case下,问题在于我们的程序在逻辑上被结构化为任务-子任务关系,但这些关系只存在于开发人员的头脑中。这不仅增加错误可能性,还会使诊断和排除此类错误变得更加困难。例如,线程转储等可观察性工具会在不相关的线程调用栈中显示 handle()findUser()fetchOrder(),而没有任务-子任务关系的提示。

    可尝试在错误发生时显式取消其他子任务,例如通过在失败的任务的 catch 块中使用 try-finally 包装任务,并调用其他任务的 Futurecancel(boolean) 方法。我们还需要在 try-with-resources 语句中使用 ExecutorService,就像

    try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
        IntStream.range(0, 10_000).forEach(i -> {
            executor.submit(() -> {
                Thread.sleep(Duration.ofSeconds(1));
                return i;
            });
        });
    }  // executor.close() is called implicitly, and waits
    

    因为 Future 没有提供等待被取消的任务的方法。但所有这些都很难做到,并且往往会使代码的逻辑意图变得更加难以理解。跟踪任务之间的关系,并手动添加所需的任务间取消边缘,是对开发人员的一种很大要求。

    无限制的并发模式

    这种需要手动协调生命周期的需求是因为 ExecutorServiceFuture 允许无限制的并发模式。在涉及的所有线程中,没有限制或顺序:

    • 一个线程可以创建一个 ExecutorService
    • 另一个线程可以向其提交工作
    • 执行工作的线程与第一个或第二个线程没有任何关系

    线程提交工作之后,一个完全不同的线程可以等待执行的结果。具有对 Future 的引用的任何代码都可以加入它(即通过调用 get() 等待其结果),甚至可以在与获取 Future 的线程不同的线程中执行代码。实际上,由一个任务启动的子任务不必返回到提交它的任务。它可以返回给许多任务中的任何一个,甚至可能是没有返回给任何任务。

    因为 ExecutorServiceFuture 允许这种无结构的使用,它们既不强制执行也不跟踪任务和子任务之间的关系,尽管这些关系是常见且有用的。因此,即使子任务在同一个任务中被提交和加入,一个子任务的失败也不能自动导致另一个子任务的取消。在上述的 handle() 方法中,fetchOrder() 的失败不能自动导致 findUser() 的取消。fetchOrder()FuturefindUser()Future 没有关系,也与最终通过其 get() 方法加入它的线程无关。与其要求开发人员手动管理这种取消,我们希望能够可靠地自动化这一过程。

    任务结构应反映代码结构

    ExecutorService 下的自由线程组合相反,单线程代码的执行总是强制执行任务和子任务的层次结构。方法的代码块 {...} 对应一个任务,代码块内部调用的方法对应子任务。调用的方法必须返回给调用它的方法,或者抛出异常给调用它的方法。它不能生存于调用它的方法之外,也不能返回或抛出异常给其他方法。因此,所有子任务在任务之前完成,每个子任务都是其父任务的子任务,每个子任务的生命周期相对于其他子任务和任务来说,都由代码块结构的语法规则来管理。

    如单线程版本的 handle() 中,任务-子任务关系在语法结构明显:

    Response handle() throws IOException {
        String theUser = findUser();
        int theOrder = fetchOrder();
        return new Response(theUser, theOrder);
    }
    

    我们不会在 findUser() 子任务完成之前启动 fetchOrder() 子任务,无论 findUser() 是成功还是失败。如果 findUser() 失败,我们根本不会启动 fetchOrder(),而且 handle() 任务会隐式地失败。一个子任务只能返回给其父任务,这是很重要的:这意味着父任务可以将一个子任务的失败隐式地视为触发来取消其他未完成的子任务,然后自己失败。

    单线程代码中,任务-子任务层次关系在运行时的调用栈中得到体现。因此,我们获得了相应的父子关系,这些关系管理着错误传播。观察单个线程时,层次关系显而易见:findUser()(及后来的 fetchOrder())似乎是在 handle() 下执行的。这使得回答问题 "handle() 正在处理什么?" 很容易。

    如任务和子任务之间的父子关系在代码的语法结构中明显,并且在运行时得到了体现,那并发编程将更加容易、可靠且易于观察,就像单线程代码一样。语法结构将定义子任务的生命周期,并使得能够在运行时创建一个类似于单线程调用栈的线程层次结构的表示。这种表示将实现错误传播、取消以及对并发程序的有意义的观察。

    7 结构化并发

    结构化并发是一种并发编程方法,它保持了任务和子任务之间的自然关系,从而实现了更具可读性、可维护性和可靠性的并发代码。"结构化并发" 这个术语由 Martin Sústrik 提出,并由 Nathaniel J. Smith 推广。从其他编程语言中的概念,如 Erlang 中的层次监控者,可以了解到结构化并发中错误处理的设计思想。

    结构化并发源于一个简单的原则:

    如果一个任务分解为并发的子任务,那么所有这些子任务都会返回到同一个地方,即任务的代码块。

    在结构化并发中,子任务代表任务工作。任务等待子任务的结果并监视它们的失败情况。与单线程代码中的结构化编程技术类似,结构化并发在多线程中的威力来自于两个思想:

    • 为代码块中的执行流程定义明确的进入和退出点
    • 在严格的操作生命周期嵌套中,以反映它们在代码中的语法嵌套方式

    由于代码块的进入和退出点被明确定义,因此并发子任务的生命周期被限定在其父任务的语法块中。因为同级子任务的生命周期嵌套在其父任务的生命周期之内,因此可以将它们作为一个单元进行推理和管理。由于父任务的生命周期,依次嵌套在其父任务的生命周期之内,运行时可以将任务层次结构实现为树状结构,类似于单线程调用栈的并发对应物。这允许代码为任务子树应用策略,如截止时间,并允许可观察性工具将子任务呈现为父任务的下属。

    结构化并发非常适合虚拟线程,这是由JDK实现的轻量级线程。许多虚拟线程可以共享同一个操作系统线程,从而可以支持非常大量的虚拟线程。除此外,虚拟线程足够廉价,可以表示任何涉及I/O等并发行为。这意味着服务器应用程序可以使用结构化并发来同时处理成千上万甚至百万个传入请求:它可以为处理每个请求的任务分配一个新的虚拟线程,当一个任务通过提交子任务进行并发执行时,它可以为每个子任务分配一个新的虚拟线程。在幕后,任务-子任务关系通过为每个虚拟线程提供一个对其唯一父任务的引用来实现为树状结构,类似于调用栈中的帧引用其唯一的调用者。

    总之,虚拟线程提供了大量的线程。结构化并发可以正确且强大地协调它们,并使可观察性工具能够按照开发人员的理解显示线程。在JDK中拥有结构化并发的API将使构建可维护、可靠且可观察的服务器应用程序变得更加容易。

    8 描述

    结构化并发 API 的主要类是 java.util.concurrent 包中的 StructuredTaskScope。该类允许开发人员将一个任务结构化为一组并发的子任务,并将它们作为一个单元进行协调。子任务通过分别分叉它们并将它们作为一个单元加入,可能作为一个单元取消,来在它们自己的线程中执行。子任务的成功结果或异常由父任务汇总并处理。StructuredTaskScope 将子任务的生命周期限制在一个清晰的词法作用域内,在这个作用域中,任务与其子任务的所有交互(分叉、加入、取消、处理错误和组合结果)都发生。

    前面提到的 handle() 示例,使用 StructuredTaskScope 编写:

    Response handle() throws ExecutionException, InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Supplier<String> user = scope.fork(() -> findUser());
            Supplier<Integer> order = scope.fork(() -> fetchOrder());
    
            scope.join()             // 加入两个子任务
                 .throwIfFailed();   // ... 并传播错误
    
            // 两个子任务都成功完成,因此组合它们的结果
            return new Response(user.get(), order.get());
        }
    }
    

    与原始示例相比,理解涉及的线程的生命周期在这里变得更加容易:在所有情况下,它们的生命周期都限制在一个词法作用域内,即 try-with-resources 语句的代码块内。此外,使用 StructuredTaskScope 可以确保一些有价值的属性:

    1. 错误处理与短路 — 如果 findUser()fetchOrder() 子任务中的任何一个失败,另一个如果尚未完成则会被取消。(这由 ShutdownOnFailure 实现的关闭策略来管理;还有其他策略可能)。
    2. 取消传播 — 如果在运行 handle() 的线程在调用 join() 之前或之中被中断,则线程在退出作用域时会自动取消两个子任务。
    3. 清晰性 — 上述代码具有清晰的结构:设置子任务,等待它们完成或被取消,然后决定是成功(并处理已经完成的子任务的结果)还是失败(子任务已经完成,因此没有更多需要清理的)。
    4. 可观察性 — 如下所述,线程转储清楚地显示了任务层次结构,其中运行 findUser()fetchOrder() 的线程被显示为作用域的子任务。

    9 突破预览版限制

    StructuredTaskScope 是预览版 API,默认禁用。要使用 StructuredTaskScope API,需启用预览 API:

    1. 使用 javac --release 21 --enable-preview Main.java 编译程序,然后使用 java --enable-preview Main 运行它;或
    2. 当使用源代码启动器时,使用 java --source 21 --enable-preview Main.java 运行程序
    3. IDEA 运行时,勾选即可:

    10 使用 StructuredTaskScope

    10.1 API

    public class StructuredTaskScope<T> implements AutoCloseable {
    
        public <U extends T> Subtask<U> fork(Callable<? extends U> task);
        public void shutdown();
    
        public StructuredTaskScope<T> join() throws InterruptedException;
        public StructuredTaskScope<T> joinUntil(Instant deadline)
            throws InterruptedException, TimeoutException;
        public void close();
    
        protected void handleComplete(Subtask<? extends T> handle);
        protected final void ensureOwnerAndJoined();
    
    }
    

    10.2 工作流程

    1. 创建一个作用域。创建作用域的线程是其所有者。
    2. 使用 fork(Callable) 方法在作用域中分叉子任务。
    3. 在任何时间,任何子任务,或者作用域的所有者,都可以调用作用域的 shutdown() 方法来取消未完成的子任务并阻止分叉新的子任务。
    4. 作用域的所有者将作用域(即所有子任务)作为一个单元加入。所有者可以调用作用域的 join() 方法,等待所有子任务已完成(无论成功与否)或通过 shutdown() 被取消。或者,它可以调用作用域的 joinUntil(java.time.Instant) 方法,等待直到截止时间。
    5. 加入后,处理子任务中的任何错误并处理其结果。
    6. 关闭作用域,通常通过隐式使用 try-with-resources 实现。这会关闭作用域(如果尚未关闭),并等待被取消但尚未完成的任何子任务完成。

    每次调用 fork(...) 都会启动一个新线程来执行一个子任务,默认情况下是虚拟线程。一个子任务可以创建它自己的嵌套的 StructuredTaskScope 来分叉它自己的子任务,从而创建一个层次结构。该层次结构反映在代码的块结构中,限制了子任务的生命周期:在作用域关闭后,所有子任务的线程都保证已终止,当块退出时不会留下任何线程。

    在作用域中的任何子任务,嵌套作用域中的任何子子任务,以及作用域的所有者,都可以随时调用作用域的 shutdown() 方法,表示任务已完成,即使其他子任务仍在执行。shutdown() 方法会中断仍在执行子任务的线程,并导致 join()joinUntil(Instant) 方法返回。因此,所有子任务都应该被编写为响应中断。在调用 shutdown() 后分叉的新子任务将处于 UNAVAILABLE 状态,不会被运行。实际上,shutdown() 是顺序代码中 break 语句的并发模拟。

    在作用域内部调用 join()joinUntil(Instant) 是强制性的。如果作用域的代码块在加入之前退出,则作用域将等待所有子任务终止,然后抛出异常。

    作用域的所有者线程可能在加入之前或加入期间被中断。例如,它可能是封闭作用域的子任务。如果发生这种情况,则 join()joinUntil(Instant) 将抛出异常,因为继续执行没有意义。然后,try-with-resources 语句将关闭作用域,取消所有子任务并等待它们终止。这的效果是自动将任务的取消传播到其子任务。如果 joinUntil(Instant) 方法的截止时间在子任务终止或调用 shutdown() 之前到期,则它将抛出异常,再次,try-with-resources 语句将关闭作用域。

    join() 成功完成时,每个子任务已经成功完成、失败或因作用域被关闭而被取消。

    一旦加入,作用域的所有者会处理失败的子任务并处理成功完成的子任务的结果;这通常是通过关闭策略来完成的(见下文)。成功完成的任务的结果可以使用 Subtask.get() 方法获得。get() 方法永远不会阻塞;如果错误地在加入之前或子任务尚未成功完成时调用它,则会抛出 IllegalStateException

    在作用域中分叉任务的子任务时,会继承 ScopedValue 绑定(JEP 446)。如果作用域的所有者从绑定的 ScopedValue 中读取值,则每个子任务将读取相同的值。

    如果作用域的所有者本身是现有作用域的子任务,即作为分叉子任务创建的,则该作用域成为新作用域的父作用域。因此,作用域和子任务形成一个树状结构。

    在运行时,StructuredTaskScope 强制执行结构和顺序并发操作。因此,它不实现 ExecutorServiceExecutor 接口,因为这些接口的实例通常以非结构化方式使用(见下文)。然而,将使用 ExecutorService 的代码迁移到使用 StructuredTaskScope 并从结构上受益是直接的。

    实际上,大多数使用 StructuredTaskScope 的情况下,可能不会直接使用 StructuredTaskScope 类,而是使用下一节描述的两个实现了关闭策略的子类之一。在其他情况下,用户可能会编写自己的子类来实现自定义的关闭策略。

    11 关闭策略

    在处理并发子任务时,通常会使用短路模式来避免不必要的工作。有时,例如,如果其中一个子任务失败,就会取消所有子任务(即同时调用所有任务),或者在其中一个子任务成功时取消所有子任务(即同时调用任何任务)。StructuredTaskScope 的两个子类,ShutdownOnFailureShutdownOnSuccess,支持这些模式,并提供在第一个子任务失败或成功时关闭作用域的策略。

    关闭策略还提供了集中处理异常以及可能的成功结果的方法。这符合结构化并发的精神,即整个作用域被视为一个单元。

    11.1 案例

    上面的 handle() 示例也使用了这策略,它在并发运行一组任务并在其中任何一个任务失败时失败:

    <T> List<T> runAll(List<Callable<T>> tasks) 
            throws InterruptedException, ExecutionException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            List<? extends Supplier<T>> suppliers = tasks.stream().map(scope::fork).toList();
            scope.join()
                 .throwIfFailed();  // 任何子任务失败,抛异常
            // 在这里,所有任务都已成功完成,因此组合结果
            return suppliers.stream().map(Supplier::get).toList();
        }
    }
    

    在第一个成功的子任务返回结果后返回该结果:

    <T> T race(List<Callable<T>> tasks, Instant deadline) 
            throws InterruptedException, ExecutionException, TimeoutException {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
            for (var task : tasks) {
                scope.fork(task);
            }
            return scope.joinUntil(deadline)
                        .result();  // 如果没有任何子任务成功完成,抛出异常
        }
    }
    

    一旦有一个子任务成功,此作用域将自动关闭,取消未完成的子任务。如果所有子任务失败或给定的截止时间过去,任务将失败。这种模式在需要从一组冗余服务中获得任何一个服务的结果的服务器应用程序中非常有用。

    虽然这俩关闭策略已内置,但开发人员可以创建自定义策略来抽象其他模式。

    11.2 处理结果

    在通过关闭策略(例如,通过 ShutdownOnFailure::throwIfFailed)进行集中异常处理和加入之后,作用域的所有者可以使用从调用 fork(...) 返回的 [Subtask] 对象处理子任务的结果,如果这些结果没有被策略处理(例如,通过 ShutdownOnSuccess::result())。

    通常情况下,作用域所有者将只调用 get() 方法的 Subtask 方法。所有其他的 Subtask 方法通常只会在自定义关闭策略的 handleComplete(...) 方法的实现中使用。实际上,我们建议将引用由 fork(...) 返回的 Subtask 的变量类型定义为 Supplier<String> 而不是 Subtask<String>(除非当然选择使用 var)。如果关闭策略本身处理子任务结果(如在 ShutdownOnSuccess 的情况下),则应完全避免使用由 fork(...) 返回的 Subtask 对象,并将 fork(...) 方法视为返回 void。子任务应将其结果作为它们的返回结果,作为策略在处理中央异常后应处理的任何信息。

    如果作用域所有者处理子任务异常以生成组合结果,而不是使用关闭策略,则异常可以作为从子任务返回的值返回。例如,下面是一个在并行运行一组任务并返回包含每个任务各自成功或异常结果的完成 Future 列表的方法:

    <T> List<Future<T>> executeAll(List<Callable<T>> tasks)
            throws InterruptedException {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
              List<? extends Supplier<Future<T>>> futures = tasks.stream()
                  .map(task -> asFuture(task))
                  .map(scope::fork)
                  .toList();
              scope.join();
              return futures.stream().map(Supplier::get).toList();
        }
    }
    
    static <T> Callable<Future<T>> asFuture(Callable<T> task) {
       return () -> {
           try {
               return CompletableFuture.completedFuture(task.call());
           } catch (Exception ex) {
               return CompletableFuture.failedFuture(ex);
           }
       };
    }
    

    11.3 自定义关闭策略

    StructuredTaskScope 可以被扩展,并且可以覆盖其受保护的 handleComplete(...) 方法,以实现除 ShutdownOnSuccessShutdownOnFailure 之外的其他策略。子类可以,例如:

    • 收集成功完成的子任务的结果,并忽略失败的子任务,
    • 在子任务失败时收集异常,或者
    • 在出现某种条件时调用 shutdown() 方法以关闭并导致 join() 方法唤醒。

    当一个子任务完成时,即使在调用 shutdown() 之后,它也会作为一个 Subtask 报告给 handleComplete(...) 方法:

    public sealed interface Subtask<T> extends Supplier<T> {
        enum State { SUCCESS, FAILED, UNAVAILABLE }
    
        State state();
        Callable<? extends T> task();
        T get();
        Throwable exception();
    }
    

    当子任务在 SUCCESS 状态或 FAILED 状态下完成时,handleComplete(...) 方法将被调用。如果子任务处于 SUCCESS 状态,可以调用 get() 方法,如果子任务处于 FAILED 状态,则可以调用 exception() 方法。在其他情况下调用 get()exception() 会引发 IllegalStateException 异常。UNAVAILABLE 状态表示以下情况之一:(1)子任务被 fork 但尚未完成;(2)子任务在关闭后完成,或者(3)子任务在关闭后被 fork,因此尚未启动。handleComplete(...) 方法永远不会为处于 UNAVAILABLE 状态的子任务调用。

    子类通常会定义方法,以使结果、状态或其他结果在 join() 方法返回后可以被后续代码使用。收集结果并忽略失败子任务的子类可以定义一个方法,该方法返回一系列结果。实施在子任务失败时关闭的策略的子类可以定义一个方法,以获取失败的第一个子任务的异常。

    扩展 StructuredTaskScope 的子类

    该子类收集成功完成的子任务的结果。它定义了 results() 方法,供主任务用于检索结果。

    class MyScope<T> extends StructuredTaskScope<T> {
    
        private final Queue<T> results = new ConcurrentLinkedQueue<>();
    
        MyScope() { super(null, Thread.ofVirtual().factory()); }
    
        @Override
        protected void handleComplete(Subtask<? extends T> subtask) {
            if (subtask.state() == Subtask.State.SUCCESS)
                results.add(subtask.get());
        }
    
        @Override
        public MyScope<T> join() throws InterruptedException {
            super.join();
            return this;
        }
    
        // 返回从成功完成的子任务获取的结果流
        public Stream<T> results() {
            super.ensureOwnerAndJoined();
            return results.stream();
        }
    
    }
    

    可以像这样使用这个自定义策略:

    <T> List<T> allSuccessful(List<Callable<T>> tasks) throws InterruptedException {
        try (var scope = new MyScope<T>()) {
            for (var task : tasks) scope.fork(task);
            return scope.join()
                        .results().toList();
        }
    }
    

    扇入场景

    上面的示例侧重于扇出场景,这些场景管理多个并发的出站 I/O 操作。StructuredTaskScope 在扇入场景中也非常有用,这些场景管理多个并发的入站 I/O 操作。在这种情况下,我们通常会响应传入请求而动态地创建未知数量的子任务。

    以下是一个服务器的示例,它在 StructuredTaskScope 中 fork 子任务以处理传入连接:

    void serve(ServerSocket serverSocket) throws IOException, InterruptedException {
        try (var scope = new StructuredTaskScope<Void>()) {
            try {
                while (true) {
                    var socket = serverSocket.accept();
                    scope.fork(() -> handle(socket));
                }
            } finally {
                // 如果发生错误或被中断,我们停止接受连接
                scope.shutdown();  // 关闭所有活动连接
                scope.join();
            }
        }
    }
    

    从并发的角度来看,这种情况与请求的方向不同,但在持续时间和任务数量方面是不同的,因为子任务是根据外部事件动态 fork 的。

    所有处理连接的子任务都在作用域内创建,因此在线程转储中很容易看到它们在一个作用域的所有者的子线程。作用域的所有者也很容易被当作一个单元关闭整个服务。

    可观察性

    我们扩展了由 JEP 444 添加的新的 JSON 线程转储格式,以显示 StructuredTaskScope 将线程分组成层次结构:

    $ jcmd <pid> Thread.dump_to_file -format=json <file>
    

    每个作用域的 JSON 对象包含一个线程数组,这些线程在作用域中被 fork,并附带它们的堆栈跟踪。作用域的所有者线程通常会在 join() 方法中被阻塞,等待子任务完成;线程转储可以通过显示由结构化并发所施加的树状层次结构,轻松地查看子任务的线程正在做什么。作用域的 JSON 对象还具有对其父级的引用,以便可以从转储中重新构建程序的结构。

    com.sun.management.HotSpotDiagnosticsMXBean API 也可以用来生成这样的线程转储,可以通过平台的 MBeanServer 和本地或远程的 JMX 工具直接或间接地使用它。

    为什么 fork(...) 没有返回 Future

    StructuredTaskScope API 处于孵化状态时,fork(...) 方法返回了 Future。这使得 fork(...) 更像是现有的 ExecutorService::submit 方法,从而提供了一种熟悉的感觉。然而,考虑到 StructuredTaskScope 的使用方式与 ExecutorService 完全不同 — 即以上文描述的结构化方式使用 — 使用 Future 带来的更多困惑远远超过了清晰性。

    熟悉的 Future 的使用涉及调用其 get() 方法,它会阻塞直到结果可用。但在 StructuredTaskScope 的上下文中,以这种方式使用 Future 不仅是不鼓励的,而且是不切实际的。Structured Future 对象应该只有在 join() 返回之后查询,此时它们已知已完成或取消,而应使用的方法不是熟悉的 get(),而是新引入的 resultNow(),它永远不会阻塞。

    一些开发人员想知道为什么 fork(...) 没有返回更强大的 CompletableFuture 对象。由于应该只有在已知它们已完成时才使用 fork(...) 返回的 Future,因此 CompletableFuture 不会提供任何好处,因为其高级功能只对未完成的 futures 有用。此外,CompletableFuture 是为异步编程范例设计的,而 StructuredTaskScope 鼓励阻塞范例。

    总之,FutureCompletableFuture 的设计旨在提供在结构化并发中是有害的自由度。

    结构化并发是将在不同线程中运行的多个任务视为单个工作单元,而 Future 主要在将多个任务视为单独任务时有用。因此,作用域只应该阻塞一次以等待其子任务的结果,然后集中处理异常。因此,在绝大多数情况下,从 fork(...) 返回的 Future 上唯一应该调用的方法是 resultNow()。这是与 Future 的正常用法的显著变化,而 Subtask::get() 方法的行为与在 API 孵化期间 Future::resultNow() 的行为完全相同。

    替代方案

    增强 ExecutorService 接口。我们对该接口进行了原型实现,该接口始终强制执行结构化并限制了哪些线程可以提交任务。然而,我们发现这在 JDK 和生态系统中的大多数使用情况下都不是结构化的。在完全不同的概念中重用相同的 API,会导致混淆。例如,将结构化 ExecutorService 实例传递给现有接受此类型的方法,几乎肯定会在大多数情况下抛出异常。

    本文由博客一文多发平台 OpenWrite 发布!

    相关文章

      网友评论

        本文标题:掌握JDK21全新结构化并发编程,轻松提升开发效率!

        本文链接:https://www.haomeiwen.com/subject/fgcymdtx.html