CompletionStage源码
/**
* A stage of a possibly asynchronous computation, that performs an
* action or computes a value when another CompletionStage completes.
* A stage completes upon termination of its computation, but this may
* in turn trigger other dependent stages. The functionality defined
* in this interface takes only a few basic forms, which expand out to
* a larger set of methods to capture a range of usage styles: <ul>
*当另外一个CompletionStage完成时,此时正处于执行一个动作或者计算一个值的异步计算阶段。
该阶段在其计算终止时完成,但这又可能触发其他从属阶段。该定义的功能仅采用几种基本形式,这些形式扩展为更大的方法集,以捕获各种使用样式
* <li>The computation performed by a stage may be expressed as a
* Function, Consumer, or Runnable (using methods with names including
* <em>apply</em>, <em>accept</em>, or <em>run</em>, respectively)
* depending on whether it requires arguments and/or produces results.
* For example, {@code stage.thenApply(x -> square(x)).thenAccept(x ->
* System.out.print(x)).thenRun(() -> System.out.println())}. An
* additional form (<em>compose</em>) applies functions of stages
* themselves, rather than their results. </li>
*根据阶段是否需要参数和/或产生结果,可以将阶段执行的计算表示为“Function”,“Consumer”或“Runnable ”(使用名称分别包括“apply”,“accept”或“run”的方法)。
* <li> One stage's execution may be triggered by completion of a
* single stage, or both of two stages, or either of two stages.
* Dependencies on a single stage are arranged using methods with
* prefix <em>then</em>. Those triggered by completion of
* <em>both</em> of two stages may <em>combine</em> their results or
* effects, using correspondingly named methods. Those triggered by
* <em>either</em> of two stages make no guarantees about which of the
* results or effects are used for the dependent stage's
* computation.</li>
*一个阶段的执行可以通过完成一个阶段,两个阶段或两个阶段之一来触发。
*对单个阶段的依赖关系使用带有前缀的方法来安排。
*通过两个阶段都完成而触发的结果可以使用相应命名的方法来组合其结果或效果。
*由两个阶段中的任何一个触发的事件都不能保证将哪个结果或效果用于从属阶段的计算。
* <li> Dependencies among stages control the triggering of
* computations, but do not otherwise guarantee any particular
* ordering. Additionally, execution of a new stage's computations may
* be arranged in any of three ways: default execution, default
* asynchronous execution (using methods with suffix <em>async</em>
* that employ the stage's default asynchronous execution facility),
* or custom (via a supplied {@link Executor}). The execution
* properties of default and async modes are specified by
* CompletionStage implementations, not this interface. Methods with
* explicit Executor arguments may have arbitrary execution
* properties, and might not even support concurrent execution, but
* are arranged for processing in a way that accommodates asynchrony.
*阶段之间的依赖关系控制着计算的触发,但不能保证任何特定的顺序。
另外,新阶段计算的执行可以三种方式安排:默认执行,
默认异步执行(使用带有后缀异步的方法,采用该阶段的默认异步执行工具)或通过提供的执行程序进行自定义。
默认模式和异步模式的执行属性由CompletionStage实现而不是此接口指定。
具有显式Executor自变量的方法可能具有任意执行属性,甚至可能不支持并发执行
,但被安排为以适应异步的方式进行处理
* <li> Two method forms support processing whether the triggering
* stage completed normally or exceptionally: Method {@link
* #whenComplete whenComplete} allows injection of an action
* regardless of outcome, otherwise preserving the outcome in its
* completion. Method {@link #handle handle} additionally allows the
* stage to compute a replacement result that may enable further
* processing by other dependent stages. In all other cases, if a
* stage's computation terminates abruptly with an (unchecked)
* exception or error, then all dependent stages requiring its
* completion complete exceptionally as well, with a {@link
* CompletionException} holding the exception as its cause. If a
* stage is dependent on <em>both</em> of two stages, and both
* complete exceptionally, then the CompletionException may correspond
* to either one of these exceptions. If a stage is dependent on
* <em>either</em> of two others, and only one of them completes
* exceptionally, no guarantees are made about whether the dependent
* stage completes normally or exceptionally. In the case of method
* {@code whenComplete}, when the supplied action itself encounters an
* exception, then the stage exceptionally completes with this
* exception if not already completed exceptionally.</li>
* </ul>
*有两种方法形式支持处理过程,无论触发阶段是正常完成还是异常完成:方法whenComplete允许不考虑结
*果而注入动作,否则将结果保留在完成时。方法句柄还允许该阶段计算替换结果,
*该替换结果使其他从属阶段可以进行进一步处理。
*在所有其他情况下,如果一个阶段的计算由于(未经检查的)异常或错误而突然终止,
*那么所有需要完成该过程的从属阶段也会异常完成,并且CompletionException将异常作为其原因。
*如果一个阶段同时依赖于两个阶段,并且都异常完成,则CompletionException可能对应于这些异常中的任何一个。
*如果一个阶段依赖于其他两个阶段中的任何一个,并且其中只有一个异常完成,则不能保证从属阶段是正常完成还是异常完成。
*在方法whenComplete的情况下,如果提供的操作本身遇到异常,则该阶段将以该异常异常完成(如果尚未异常完成)。
* <p>All methods adhere to the above triggering, execution, and
* exceptional completion specifications (which are not repeated in
* individual method specifications). Additionally, while arguments
* used to pass a completion result (that is, for parameters of type
* {@code T}) for methods accepting them may be null, passing a null
* value for any other parameter will result in a {@link
* NullPointerException} being thrown.
*所有方法都遵循上述触发,执行和特殊完成规范(在各个方法规范中不再重复)。
* 此外,虽然用于传递完成结果的参数(即,对于T类型参数接受方法的参数可能为null,
*但为其他任何参数传递null值将导致抛出NullPointerException
* <p>This interface does not define methods for initially creating,
* forcibly completing normally or exceptionally, probing completion
* status or results, or awaiting completion of a stage.
* Implementations of CompletionStage may provide means of achieving
* such effects, as appropriate. Method {@link #toCompletableFuture}
* enables interoperability among different implementations of this
* interface by providing a common conversion type.
*该接口未定义用于初始创建,强制正常或例外地完成,探测完成状态或结果或等待阶段完成的方法。
*CompletionStage的实现可以酌情提供实现这种效果的方法。
*方法toCompletableFuture通过提供常见的转换类型来实现此接口的不同实现之间的互操作性
* @author Doug Lea
* @since 1.8
*/
public interface CompletionStage<T>
在这之前,先学习三个接口,Supplier,Function,Consumer:
Supplier
可以简单理解为数据的提供者
/**
* Represents a supplier of results.
*代表结果的提供者。
* <p>There is no requirement that a new or distinct result be returned each
* time the supplier is invoked.
*不需要每次调用supplier 时都返回新的或不同的结果。
* <p>This is a <a href="package-summary.html">functional interface</a>
* whose functional method is {@link #get()}.
*
* @param <T> the type of results supplied by this supplier
*
* @since 1.8
*/
@FunctionalInterface
public interface Supplier<T>
Function
可以理解数据转换,接收一个参数,然后返回一个数据
/**
* Represents a function that accepts one argument and produces a result.
*表示接受一个参数并产生一个结果的功能。
* <p>This is a <a href="package-summary.html">functional interface</a>
* whose functional method is {@link #apply(Object)}.
*
* @param <T> the type of the input to the function
*函数输入的数据类型
* @param <R> the type of the result of the function
*函数返回的数据类型
* @since 1.8
*/
@FunctionalInterface
public interface Function<T, R>
Consumer
可以理解为消费掉数据,没有返回值
/**
* Represents an operation that accepts a single input argument and returns no
* result. Unlike most other functional interfaces, {@code Consumer} is expected
* to operate via side-effects.
*表示一个接受单个输入参数且不返回结果的操作。 与大多数其他功能接口不同,消费者有望通过副作用进行操作。
* <p>This is a <a href="package-summary.html">functional interface</a>
* whose functional method is {@link #accept(Object)}.
*
* @param <T> the type of the input to the operation
*
* @since 1.8
*/
@FunctionalInterface
public interface Consumer<T>
总结:
其实这三者的角色可以理解为数据的产生、数据的转换以及数据的消费。无论是数据的转换,还是数据的消费,前提条件就是你得有数据不是,所以这三者中,Suplier在其中扮演着必不可少的角色。
可以站在分工的角度类比一下工作流。任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等,CompletionStage 接口可以清晰地描述任务之间的这种时序关系.下面一一介绍,CompletionStage 接口如何描述串行关系、AND 聚合关系、OR 聚合关系以及异常处理。
1. 描述串行关系
CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun 和 thenCompose 这四个系列的接口
thenApply系列
CompletionStage<R> thenApply(fn);
CompletionStage<R> thenApplyAsync(fn);
CompletionStage<R> thenApplyAsync(fn,executor);
CompletionStage<Void> thenAccept(consumer);
CompletionStage<Void> thenAcceptAsync(consumer);
CompletionStage<Void> thenAcceptAsync(consumer,executor);
CompletionStage<Void> thenRun(action);
CompletionStage<Void> thenRunAsync(action);
CompletionStage<Void> thenRunAsync(action,executor);
CompletionStage<R> thenCompose(fn);
CompletionStage<R> thenComposeAsync(fn);
CompletionStage<R> thenComposeAsync(fn,executor);
thenApply(Function<? super T,? extends U> fn)
返回一个新的CompletionStage,当此阶段正常完成时,将使用该阶段的结果作为所提供函数的参数来执行
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed with this stage's result as the argument
* to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
thenApplyAsync(Function<? super T,? extends U> fn)
返回一个新的CompletionStage,当此阶段正常完成时,将使用该阶段的默认异步执行工具执行该阶段,并将该阶段的结果作为所提供函数的参数
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using this stage's default asynchronous
* execution facility, with this stage's result as the argument to
* the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public <U> CompletionStage<U> thenApplyAsync
(Function<? super T,? extends U> fn);
thenApplyAsync (Function<? super T,? extends U> fn,Executor executor)
返回一个新的CompletionStage,当此阶段正常完成时,将使用该阶段的默认异步执行工具执行该阶段,并将该阶段的结果作为所提供函数的参数。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using the supplied Executor, with this
* stage's result as the argument to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @param <U> the function's return type
* @return the new CompletionStage
*/
public <U> CompletionStage<U> thenApplyAsync (Function<? super T,? extends U> fn,Executor executor);
Show the code
public class ThenApplyDemo {
public static void main(String[] args) {
CompletableFuture thenApply=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Hello";
}
}).thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
return s+"---";
}
}).thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
return s+"World";
}
});
try {
System.out.println((String)thenApply.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the result of code
Hello---World
Process finished with exit code 0
thenAccept系列
thenAccept(Consumer<? super T> action)
返回一个新的CompletionStage,当此阶段正常完成时,将使用该阶段的结果作为所提供操作的参数来执行该阶段。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed with this stage's result as the argument
* to the supplied action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
thenAcceptAsync(Consumer<? super T> action)
返回一个新的CompletionStage,当该阶段正常完成时,将使用该阶段的默认异步执行工具执行该阶段,并将该阶段的结果作为所提供操作的参数。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using this stage's default asynchronous
* execution facility, with this stage's result as the argument to
* the supplied action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
thenAcceptAsync(Consumer<? super T> action,Executor executor)
返回一个新的CompletionStage,当此阶段正常完成时,将使用提供的Executor执行此阶段,并将该阶段的结果作为提供的操作的参数。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using the supplied Executor, with this
* stage's result as the argument to the supplied action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @return the new CompletionStage
*/
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
Show the code
public class ThenAcceptDemo {
public static void main(String[] args) {
CompletableFuture thenAccept=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Hello thenAccept";
}
}).thenAccept(new Consumer<String>() {
@Override
public void accept(String s) {
System.out.println(s);
}
});
try {
System.out.println((String) thenAccept.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the result of code
Hello thenAccept
null
Process finished with exit code 0
Show the summary
1、Consumer已经正常消费掉数据了
2、从打印的数据可以验证,Consumer是没有返回数据的,所以获取到的数据为null。
thenRun系列
thenRun(Runnable action)
返回一个新的CompletionStage,当此阶段正常完成时,该CompletionStage执行给定的操作
/**
* Returns a new CompletionStage that, when this stage completes
* normally, executes the given action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> thenRun(Runnable action);
thenRunAsync(Runnable action)
返回一个新的CompletionStage,当此阶段正常完成时,将使用此阶段的默认异步执行工具来执行给定的动作。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, executes the given action using this stage's default
* asynchronous execution facility.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> thenRunAsync(Runnable action);
thenRunAsync(Runnable action,Executor executor)
返回一个新的CompletionStage,该阶段正常完成时,将使用提供的Executor执行给定的动作。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, executes the given action using the supplied Executor.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param action the action to perform before completing the
* returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @return the new CompletionStage
*/
public CompletionStage<Void> thenRunAsync(Runnable action,
Executor executor);
thenCompose系列
thenCompose (Function<? super T, ? extends CompletionStage<U>> fn)
返回一个新的CompletionStage,当此阶段正常完成时,将使用此阶段作为所提供函数的参数来执行
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed with this stage as the argument
* to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function returning a new CompletionStage
* @param <U> the type of the returned CompletionStage's result
* @return the CompletionStage
*/
public <U> CompletionStage<U> thenCompose
(Function<? super T, ? extends CompletionStage<U>> fn);
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
返回一个新的CompletionStage,当此阶段正常完成时,将使用该阶段的默认异步执行工具将其作为提供的函数的参数来执行
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using this stage's default asynchronous
* execution facility, with this stage as the argument to the
* supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function returning a new CompletionStage
* @param <U> the type of the returned CompletionStage's result
* @return the CompletionStage
*/
public <U> CompletionStage<U> thenComposeAsync
(Function<? super T, ? extends CompletionStage<U>> fn);
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor)
返回一个新的CompletionStage,当此阶段正常完成时,将使用提供的Executor执行此阶段,并将该阶段的结果作为提供的函数的参数。
/**
* Returns a new CompletionStage that, when this stage completes
* normally, is executed using the supplied Executor, with this
* stage's result as the argument to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param fn the function returning a new CompletionStage
* @param executor the executor to use for asynchronous execution
* @param <U> the type of the returned CompletionStage's result
* @return the CompletionStage
*/
public <U> CompletionStage<U> thenComposeAsync
(Function<? super T, ? extends CompletionStage<U>> fn,
Executor executor);
Show the code
public class ThenComposeDemo {
public static void main(String[] args) {
CompletableFuture thenCompose=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Hello supplyAsync";
}
}).thenCompose(new Function<String, CompletionStage<String>>() {
@Override
public CompletionStage<String> apply(String s) {
return CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return s+"--"+"Hello thenCompose";
}
});
}
});
try {
System.out.println(thenCompose.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the result of code
Hello supplyAsync--Hello thenCompose
Process finished with exit code 0
2. 描述 AND 汇聚关系
CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同
CompletionStage<R> thenCombine(other, fn);
CompletionStage<R> thenCombineAsync(other, fn);
CompletionStage<R> thenCombineAsync(other, fn,executor);
CompletionStage<Void> thenAcceptBoth(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer);
CompletionStage<Void> thenAcceptBothAsync(other, consumer,executor);
CompletionStage<Void> runAfterBoth(other, action);
CompletionStage<Void> runAfterBothAsync(other, action);
CompletionStage<Void> runAfterBothAsync(other, action,executor);
![](https://img.haomeiwen.com/i1587509/32a0baf0454d6bd2.png)
thenCombine的第二个参数是BiFunction,前面我们解释过,Function类型的参数会接收一个参数,并且最后会返回一个结果;thenAcceptBoth的第二个参数是BiConsumer,它是个消费型的,只接收参数并且消费掉该数据;runAfterBoth的第二个参数是Runnable。
thenCombine系列
thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
返回一个新的CompletionStage,当此阶段和另一个给定阶段都正常完成时,将使用两个结果作为所提供函数的参数来执行。
/**
* Returns a new CompletionStage that, when this and the other
* given stage both complete normally, is executed with the two
* results as arguments to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the type of the other CompletionStage's result
* @param <V> the function's return type
* @return the new CompletionStage
*/
public <U,V> CompletionStage<V> thenCombine
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
thenCombineAsync (CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn);
返回一个新的CompletionStage,当此阶段和另一个给定阶段正常完成时,将使用该阶段的默认异步执行工具来执行,并将两个结果作为所提供函数的参数。
/**
* Returns a new CompletionStage that, when this and the other
* given stage complete normally, is executed using this stage's
* default asynchronous execution facility, with the two results
* as arguments to the supplied function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the type of the other CompletionStage's result
* @param <V> the function's return type
* @return the new CompletionStage
*/
public <U,V> CompletionStage<V> thenCombineAsync
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn);
thenCombineAsync (CompletionStage<? extends U> other,BiFunction<? super T,? super U,?extends V> fn,Executor executor)
返回一个新的CompletionStage,当此阶段和另一个给定阶段正常完成时,将使用提供的executor将其执行,并将两个结果作为提供函数的参数。
/**
* Returns a new CompletionStage that, when this and the other
* given stage complete normally, is executed using the supplied
* executor, with the two results as arguments to the supplied
* function.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param executor the executor to use for asynchronous execution
* @param <U> the type of the other CompletionStage's result
* @param <V> the function's return type
* @return the new CompletionStage
*/
public <U,V> CompletionStage<V> thenCombineAsync
(CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn,
Executor executor);
Show the code
public class ThenCombineDemo {
public static void main(String[] args) {
CompletableFuture supplyAsync=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Hello";
}
});
CompletableFuture thenCombine=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "ThenCombine";
}
});
CompletableFuture result=supplyAsync.thenCombine(thenCombine, new BiFunction<String,String,String>() {
@Override
public String apply(String o, String o2) {
return o+"--"+o2;
}
});
try {
System.out.println(result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the result
Hello--ThenCombine
Process finished with exit code 0
Show the summary
上面的例子我们可以看出,thenCombine和thenApply一样都可以对返回的数据进行转换操作,不同的是,thenCombine针对的是两个CompletableFuture的返回结果,而thenApply是针对同一个CompletableFuture的返回结果进行转换操作的。
thenAcceptBoth系列
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action)
/**
* Returns a new CompletionStage that, when this and the other
* given stage both complete normally, is executed with the two
* results as arguments to the supplied action.
*返回一个新的CompletionStage,当此阶段和另一个给定阶段都正常完成时,将使用两个结果作为所提供操作的参数来执行。
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @param <U> the type of the other CompletionStage's result
* @return the new CompletionStage
*/
public <U> CompletionStage<Void> thenAcceptBoth
(CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
runAfterBoth系列
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action)
/**
* Returns a new CompletionStage that, when this and the other
* given stage both complete normally, executes the given action.
*返回一个新的CompletionStage,当此阶段和另一个给定阶段都正常完成时,执行给定操作
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,
Runnable action)
Show the code
public class RunAfterBothDemo {
public static void main(String[] args) {
CompletableFuture runAsync=CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("runAsync:"+System.currentTimeMillis());
}
});
CompletableFuture other=CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("other:"+System.currentTimeMillis());
}
});
runAsync.runAfterBoth(other, new Runnable() {
@Override
public void run() {
System.out.println("runAfterBoth:"+System.currentTimeMillis());
}
});
try {
runAsync.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the result of code
other:1621507355629
runAsync:1621507356634
runAfterBoth:1621507356634
Process finished with exit code 0
由于在runAsync里面加入了sleep(),所有other先于runAsync运行。
3. 描述 OR 汇聚关系
CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和 runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。
CompletionStage applyToEither(other, fn);
CompletionStage applyToEitherAsync(other, fn);
CompletionStage applyToEitherAsync(other, fn,executor);
CompletionStage acceptEither(other, consumer);
CompletionStage acceptEitherAsync(other, consumer);
CompletionStage acceptEitherAsync(other, consumer,executor);
CompletionStage runAfterEither(other, action);
CompletionStage runAfterEitherAsync(other, action);
CompletionStage runAfterEitherAsync(other, action,executor);
applyToEither系列
applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn)
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, is executed with the
* corresponding result as argument to the supplied function.
*返回一个新的CompletionStage,当此阶段或另一个给定阶段正常完成时,将使用相应的结果作为所提供函数的参数来执行
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param fn the function to use to compute the value of
* the returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public <U> CompletionStage<U> applyToEither
(CompletionStage<? extends T> other,
Function<? super T, U> fn);
Show the wrong code
public class ApplyToEitherDemo {
public static void main(String[] args) {
CompletableFuture supplyAsync = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "First";
}
});
CompletableFuture other = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Other";
}
});
supplyAsync.applyToEither(other, new Function<String, String>() {
@Override
public String apply(String o) {
return 0 + "--applyToEither";
}
});
try {
System.out.println(supplyAsync.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the wrong result of code
First
Process finished with exit code 0
我故意在第一个里面做了sleep操作,让Other优先完成,但是输出的结果却不是自己想要的,折腾了很久,也没有折腾出来,第二天才发现,问题出在最后的get方法上,不是调用第一个CompletableFuture的get方法,而是调用新生成的CompletableFuture的get方法。这个点着实坑了自己一把。
Show the code
public class ApplyToEitherDemo {
public static void main(String[] args) {
CompletableFuture first=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello";
}
});
CompletableFuture second=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "World";
}
});
CompletableFuture applyToEither = first.applyToEither(second, new Function<String, String>() {
@Override
public String apply(String s) {
System.out.println("apply:" + s);
return s + "---->applyToEither";
}
});
try {
System.out.println("Result:"+applyToEither.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
Show the result of code
apply:World
Result:World---->applyToEither
Process finished with exit code 0
由于第一个CompletableFuture睡眠了1秒,所以第二个CompletableFuture会先完成,所以最后得到的数据为第二个CompletableFuture返回的数据
acceptEither系列
acceptEither (CompletionStage<? extends T> other,Consumer<? super T> action)
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, is executed with the
* corresponding result as argument to the supplied action.
*返回一个新的CompletionStage,当此阶段或另一个给定阶段正常完成时,
将使用相应的结果作为所提供操作的参数来执行。
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> acceptEither
(CompletionStage<? extends T> other,
Consumer<? super T> action);
Show the code
public class AcceptEitherDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf1=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello";
}
});
CompletableFuture other=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Other";
}
});
CompletableFuture result=cf1.acceptEither(other, new Consumer<String>() {
@Override
public void accept(String o) {
System.out.println("result:"+o);
}
});
System.out.println(result.get());
}
}
Show the result of code
result:Other
null
Process finished with exit code 0
Remove the sleep()
public class AcceptEitherDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf1=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Hello";
}
});
CompletableFuture other=CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
return "Other";
}
});
CompletableFuture result=cf1.acceptEither(other, new Consumer<String>() {
@Override
public void accept(String o) {
System.out.println("result:"+o);
}
});
System.out.println(result.get());
}
}
Show the result of remove sleep
result:Hello
null
Process finished with exit code 0
如果移除第一个里面的sleep(),则第一个会先执行完毕,则得到的数据为第一个的返回的数据
runAfterEither系列
* runAfterEither(CompletionStage<?> other,Runnable action)*
有一个执行完则执行后面给定的操作
/**
* Returns a new CompletionStage that, when either this or the
* other given stage complete normally, executes the given action.
*
* See the {@link CompletionStage} documentation for rules
* covering exceptional completion.
*返回一个新的CompletionStage,当此阶段或另一个给定阶段正常完成时,将执
行给定操作。
* @param other the other CompletionStage
* @param action the action to perform before completing the
* returned CompletionStage
* @return the new CompletionStage
*/
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,
Runnable action);
Show the code
public class RunAfterEitherDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf=CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("cf");
}
});
CompletableFuture other=CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
System.out.println("Other");
}
});
CompletableFuture runAfterEither=cf.runAfterEither(other, new Runnable() {
@Override
public void run() {
System.out.println("runAfterEither");
}
});
runAfterEither.get();
}
}
Show the result of code
Other
runAfterEither
Process finished with exit code 0
由于第一个里面增加了sleep方法,所以后面的会先于第一个执行完毕。
4. 异常处理
虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行 7/0 就会出现除零错误这个运行时异常。非异步编程里面,我们可以使用 try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?
CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式
CompletionStage exceptionally(fn);
CompletionStage<R> whenComplete(consumer);
CompletionStage<R> whenCompleteAsync(consumer);
CompletionStage<R> handle(fn);
CompletionStage<R> handleAsync(fn);
Handle系列
handle (BiFunction<? super T, Throwable, ? extends U> fn)
参数里面需要传入一个BiFunction类型的,所以该方法属于转换类型的,既有入参,也有返回值。
/**
* Returns a new CompletionStage that, when this stage completes
* either normally or exceptionally, is executed with this stage's
* result and exception as arguments to the supplied function.
*返回一个新的CompletionStage,当此阶段正常或异常完成时,将使用该阶段的结果和异常作为所提供函数的参数来执行。
* <p>When this stage is complete, the given function is invoked
* with the result (or {@code null} if none) and the exception (or
* {@code null} if none) of this stage as arguments, and the
* function's result is used to complete the returned stage.
*完成此阶段后,将使用该阶段的结果(如果没有则为null)和该阶段的异常(如果没有则为null)作为参
数调用给定函数,
然后使用函数的结果来完成返回的阶段
* @param fn the function to use to compute the value of the
* returned CompletionStage
* @param <U> the function's return type
* @return the new CompletionStage
*/
public <U> CompletionStage<U> handle
(BiFunction<? super T, Throwable, ? extends U> fn);
Show the code
public class HandleDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf=CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int result=10/0;
return result;
}
}).handle(new BiFunction<Integer, Throwable, Integer>() {
@Override
public Integer apply(Integer integer, Throwable throwable) {
int handle=-1;
if (throwable==null){
handle=0;
}else{
handle=-1;
System.out.println(throwable.getMessage());
}
return handle;
}
});
System.out.println(cf.get());
}
}
Show the result of code
java.lang.ArithmeticException: / by zero
-1
Process finished with exit code 0
WhemComplete系列
whenComplete(BiConsumer<? super T, ? super Throwable> action)
由第一个入参BiConsumer可以得知,该方法属于消费型,没有返回值。
/**
* Returns a new CompletionStage with the same result or exception as
* this stage, that executes the given action when this stage completes.
*该阶段完成执行给定的操作返回具有与该阶段相同的结果或异常的新CompletionStage
* <p>When this stage is complete, the given action is invoked with the
* result (or {@code null} if none) and the exception (or {@code null}
* if none) of this stage as arguments. The returned stage is completed
* when the action returns. If the supplied action itself encounters an
* exception, then the returned stage exceptionally completes with this
* exception unless this stage also completed exceptionally.
*完成此阶段后,将使用该阶段的结果(如果没有则为null)和该阶段的异常(如果没有则为null)作为参
数来调用给定操作。 动作返回时,返回的阶段已完成。 如果所提供的操作本身遇到异常,那么返回阶段
将异常终止,除非该阶段也异常完成。
* @param action the action to perform
* @return the new CompletionStage
*/
public CompletionStage<T> whenComplete
(BiConsumer<? super T, ? super Throwable> action);
Normal code
the normal code
public class WhenCompleteDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int result = 10 / 5;
return result;
}
});
CompletableFuture whenCompleteAsync = cf.whenCompleteAsync(new BiConsumer<Integer, Throwable>() {
@Override
public void accept(Integer integer, Throwable throwable) {
if (throwable == null) {
System.out.println(integer);
} else {
System.out.println(throwable.getMessage());
}
}
});
System.out.println("结果:"+whenCompleteAsync.get());
}
}
the result of normal code
2
结果:2
Process finished with exit code 0
Except code
the except code
public class WhenCompleteDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
int result = 10 / 0;
return result;
}
});
CompletableFuture whenCompleteAsync = cf.whenCompleteAsync(new BiConsumer<Integer, Throwable>() {
@Override
public void accept(Integer integer, Throwable throwable) {
if (throwable == null) {
System.out.println(integer);
} else {
System.out.println(throwable.getMessage());
}
}
});
System.out.println("结果:"+whenCompleteAsync.get());
}
}
the result of except code
java.lang.ArithmeticException: / by zero
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
at com.yandroid.thread.future.WhenCompleteDemo.main(WhenCompleteDemo.java:26)
Caused by: java.lang.ArithmeticException: / by zero
at com.yandroid.thread.future.WhenCompleteDemo$2.get(WhenCompleteDemo.java:13)
at com.yandroid.thread.future.WhenCompleteDemo$2.get(WhenCompleteDemo.java:10)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Process finished with exit code 1
虽然也打印出了异常信息,但是与Handle不同的是,该方法导致了main线程的异常;而Handle方法打印出了异常信息,但是main线程还是正常的。
exceptionally
exceptionally(Function<Throwable, ? extends T> fn)
/**
* Returns a new CompletionStage that, when this stage completes
* exceptionally, is executed with this stage's exception as the
* argument to the supplied function. Otherwise, if this stage
* completes normally, then the returned stage also completes
* normally with the same value.
*返回一个新的CompletionStage,当此阶段异常完成时,将以该阶段的异常作为提供的函数的参数执行该
新的CompletionStage。 否则,如果此阶段正常完成,则返回的阶段也将以相同的值正常完成。
* @param fn the function to use to compute the value of the
* returned CompletionStage if this CompletionStage completed
* exceptionally
* @return the new CompletionStage
*/
public CompletionStage<T> exceptionally
(Function<Throwable, ? extends T> fn);
Except code
the except code
public class ExceptionallyDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 6 / 0;
}
});
CompletableFuture whenComplete = cf.exceptionally(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) {
System.out.println(throwable.getMessage());
return -1;
}
});
System.out.println(whenComplete.get());
}
}
the result of except code
java.lang.ArithmeticException: / by zero
-1
Process finished with exit code 0
Show the normal
normal code
public class ExceptionallyDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture cf = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
return 6 / 3;
}
});
CompletableFuture whenComplete = cf.exceptionally(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) {
System.out.println(throwable.getMessage());
return -1;
}
});
System.out.println(whenComplete.get());
}
}
the result of normal code
2
Process finished with exit code 0
网友评论