Reactor 简介
前面提到的 RxJava 库是 JVM 上反应式编程的先驱,也是反应式流规范的基础。RxJava 2 在 RxJava 的基础上做了很多的更新。不过 RxJava 库也有其不足的地方。RxJava 产生于反应式流规范之前,虽然可以和反应式流的接口进行转换,但是由于底层实现的原因,使用起来并不是很直观。RxJava 2 在设计和实现时考虑到了与规范的整合,不过为了保持与 RxJava 的兼容性,很多地方在使用时也并不直观。Reactor 则是完全基于反应式流规范设计和实现的库,没有 RxJava 那样的历史包袱,在使用上更加的直观易懂。Reactor 也是 Spring 5 中反应式编程的基础。学习和掌握 Reactor 可以更好地理解 Spring 5 中的相关概念。
Reactor是JVM的完全非阻塞反应式编程基础,具有高效的需求管理(以管理“背压”的形式)。它直接与Java 8功能的API,特别是整合CompletableFuture,Stream和 Duration。它提供了可组合的异步序列API Flux(用于[N]元素)和Mono(用于[0 | 1]元素),广泛地实现了Reactive Extensions规范。这段的重点是和Java8结合利用lambda表达式简洁的优点。
使用
在 Reactor 中,经常使用的类并不是很多,主要有以下两个:
Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者。
Flux 同样实现了 org.reactivestreams.Publisher 接口,代表0到N个元素的发表者。
Scheduler 表示背后驱动反应式流的调度器,通常由各种线程池实现。
创建 Flux
有多种不同的方式可以创建 Flux 序列。
Flux 类的静态方法
第一种方式是通过 Flux 类中的静态方法。
just():可以指定序列中包含的全部元素。创建出来的 Flux 序列在发布这些元素之后会自动结束。
fromArray(),fromIterable()和 fromStream():可以从一个数组、Iterable 对象或 Stream 对象中创建 Flux 对象。
empty():创建一个不包含任何元素,只发布结束消息的序列。
error(Throwable error):创建一个只包含错误消息的序列。
never():创建一个不包含任何消息通知的序列。
range(int start, int count):创建包含从 start 起始的 count 个数量的 Integer 对象的序列。
interval(Duration period)和 interval(Duration delay, Duration period):创建一个包含了从 0 开始递增的 Long 对象的序列。其中包含的元素按照指定的间隔来发布。除了间隔时间之外,还可以指定起始元素发布之前的延迟时间。
intervalMillis(long period)和 intervalMillis(long delay, long period):与 interval()方法的作用相同,只不过该方法通过毫秒数来指定时间间隔和延迟时间。
Flux.just("Hello", "World").subscribe(System.out::println);
Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);
Flux.empty().subscribe(System.out::println);
Flux.range(1, 10).subscribe(System.out::println);
Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);
Flux.intervalMillis(1000).subscribe(System.out::println);
上面的这些静态方法适合于简单的序列生成,当序列的生成需要复杂的逻辑时,则应该使用 generate() 或 create() 方法。
//通过generate生成元素
Flux.generate(sink -> {
sink.next("Hello");
sink.complete();
}).subscribe(System.out::println);
final Random random = new Random();
Flux.generate(ArrayList::new, (list, sink) -> {
int value = random.nextInt(100);
list.add(value);
sink.next(value);
if (list.size() == 10) {
sink.complete();
}
return list;
}).subscribe(System.out::println);
//通过create生成元素
Flux.create(sink -> {
for (int i = 0; i < 10; i++) {
sink.next(i);
}
sink.complete();
}).subscribe(System.out::println);
Mono 的创建
Mono 的创建方式与之前介绍的 Flux 比较相似。Mono 类中也包含了一些与 Flux 类中相同的静态方法。这些方法包括 just(),empty(),error()和 never()等。除了这些方法之外,Mono 还有一些独有的静态方法。
fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分别从 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中创建 Mono。
delay(Duration duration)和 delayMillis(long duration):创建一个 Mono 序列,在指定的延迟时间之后,产生数字 0 作为唯一值。
ignoreElements(Publisher<T> source):创建一个 Mono 序列,忽略作为源的 Publisher 中的所有元素,只产生结束消息。
justOrEmpty(Optional<? extends T> data)和 justOrEmpty(T data):从一个 Optional 对象或可能为 null 的对象中创建 Mono。只有 Optional 对象中包含值或对象不为 null 时,Mono 序列才产生对应的元素。
还可以通过 create()方法来使用 MonoSink 来创建 Mono。代码清单 4 中给出了创建 Mono 序列的示例。
Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);
Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);
Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);
操作符
filter:
对流中包含的元素进行过滤,只留下满足 Predicate 指定条件的元素
Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);
zipWith:
zipWith 操作符把当前流中的元素与另外一个流中的元素按照一对一的方式进行合并。
Flux.just("a", "b")
.zipWith(Flux.just("c", "d"))
.subscribe(System.out::println);
Flux.just("a", "b")
.zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))
.subscribe(System.out::println);
reduce 和 reduceWith:
reduce 和 reduceWith 操作符对流中包含的所有元素进行累积操作,得到一个包含计算结果的 Mono 序列。累积操作是通过一个 BiFunction 来表示的。在操作时可以指定一个初始值。如果没有初始值,则序列的第一个元素作为初始值。
Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);
Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);
flatMap 和 flatMapSequential
flatMap 和 flatMapSequential 操作符把流中的每个元素转换成一个流,再把所有流中的元素进行合并。flatMapSequential 和 flatMap 之间的区别与 mergeSequential 和 merge 之间的区别是一样的。
Flux.just(5, 10)
.flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))
.toStream()
.forEach(System.out::println);
调度器
前面介绍了反应式流和在其上可以进行的各种操作,通过调度器(Scheduler)可以指定这些操作执行的方式和所在的线程。有下面几种不同的调度器实现。
当前线程,通过 Schedulers.immediate()方法来创建。
单一的可复用的线程,通过 Schedulers.single()方法来创建。
使用弹性的线程池,通过 Schedulers.elastic()方法来创建。线程池中的线程是可以复用的。当所需要时,新的线程会被创建。如果一个线程闲置太长时间,则会被销毁。该调度器适用于 I/O 操作相关的流的处理。
使用对并行操作优化的线程池,通过 Schedulers.parallel()方法来创建。其中的线程数量取决于 CPU 的核的数量。该调度器适用于计算密集型的流的处理。
使用支持任务调度的调度器,通过 Schedulers.timer()方法来创建。
从已有的 ExecutorService 对象中创建调度器,通过 Schedulers.fromExecutorService()方法来创建。
某些操作符默认就已经使用了特定类型的调度器。比如 intervalMillis()方法创建的流就使用了由 Schedulers.timer()创建的调度器。通过 publishOn()和 subscribeOn()方法可以切换执行操作的调度器。其中 publishOn()方法切换的是操作符的执行方式,而 subscribeOn()方法切换的是产生流中元素时的执行方式。
使用调度器切换操作符执行方式:
Flux.create(sink -> {
sink.next(Thread.currentThread().getName());
sink.complete();
})
.publishOn(Schedulers.single())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.publishOn(Schedulers.elastic())
.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))
.subscribeOn(Schedulers.parallel())
.toStream()
.forEach(System.out::println);
在以上代码中,使用 create()方法创建一个新的 Flux 对象,其中包含唯一的元素是当前线程的名称。接着是两对 publishOn()和 map()方法,其作用是先切换执行时的调度器,再把当前的线程名称作为前缀添加。最后通过 subscribeOn()方法来改变流产生时的执行方式。运行之后的结果是[elastic-2] [single-1] parallel-1。最内层的线程名字 parallel-1 来自产生流中元素时使用的 Schedulers.parallel()调度器,中间的线程名称 single-1 来自第一个 map 操作之前的 Schedulers.single()调度器,最外层的线程名字 elastic-2 来自第二个 map 操作之前的 Schedulers.elastic()调度器。
回压
回压的处理有以下几种策略:
IGNORE: 完全忽略下游背压请求,这可能会在下游队列积满的时候导致 IllegalStateException。
ERROR: 当下游跟不上节奏的时候发出一个 IllegalStateException 的错误信号。
DROP:当下游没有准备好接收新的元素的时候抛弃这个元素。
LATEST:让下游只得到上游最新的元素。
BUFFER:(默认的)缓存所有下游没有来得及处理的元素(这个不限大小的缓存可能导致 OutOfMemoryError)。
方法签名:
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter, OverflowStrategy backpressure)
默认(没有第二个参数的方法)是缓存策略的,我们来试一下别的策略,比如DROP的策略。
@Test
public void OverflowStrategy() throws InterruptedException {
Flux flux = Flux.create(sink -> {
for (int i = 0; i < 1000; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("create " + i + " ---");
sink.next(i);
}
sink.complete();
}, FluxSink.OverflowStrategy.BUFFER) //1 调整不同的策略(BUFFER/DROP/LATEST/ERROR/IGNORE)观察效果,create方法默认为BUFFER;
.publishOn(Schedulers.newSingle("newSingle"), 1) //2 使用publishOn让后续的操作符和订阅者运行在一个单独的名为newSingle的线程上,第二个参数1是预取个数
.doOnComplete(() -> System.out.println("completed!")) // 结束时打印
.doOnRequest(n -> System.out.println("Request " + n + " values..."));//3 打印出每次的请求
flux.subscribe(new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) { // 4
System.out.println("Subscribed and make a request...");
request(1); // 5
}
@Override
protected void hookOnNext(Integer value) { // 6
try {
TimeUnit.SECONDS.sleep(1); // 7
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Get value [" + value + "]"); // 8
request(1); // 9
}
});
Thread.sleep(100 * 1000);
}
不同的策略(BUFFER/DROP/LATEST/ERROR/IGNORE)观察到的结果不同。
Hot vs Cold
到目前为止,我们讨论的发布者,无论是Flux还是Mono,都有一个特点:订阅前什么都不会发生。当我们“创建”了一个Flux的时候,我们只是“声明”/“组装”了它,但是如果不调用.subscribe来订阅它,它就不会开始发出元素。
但是我们对“数据流”(尤其是乍听到这个词的时候)会有种天然的感觉,就是无论有没有订阅者,它始终在按照自己的步伐发出数据。就像假设一个人没有一个粉丝,他也可以发微博一样。
以上这两种数据流分别称为“冷”序列和“热”序列。所以我们一直在介绍的Reactor3的发布者就属于“冷”的发布者。不过有少数的例外,比如just生成的就是一个“热”序列,它直接在组装期就拿到数据,如果之后有谁订阅它,就重新发送数据给订阅者。Reactor 中多数其他的“热”发布者是扩展自Processor 的(下节会介绍到)。
下面我们通过对比了解一下两种不同的发布者的效果,首先是我们熟悉的“冷”发布者:
@Test
public void testCodeSequence() {
Flux<String> source = Flux.*fromIterable*(Arrays.*asList*("blue", "green", "orange", "purple"))
.map(String::toUpperCase);
source.subscribe(d -> System.*out*.println("Subscriber 1: " + d));
System.*out*.println();
source.subscribe(d -> System.*out*.println("Subscriber 2: " + d));
}
我们对发布者source进行了两次订阅,每次订阅都导致它把数据流从新发一遍:
Subscriber 1: BLUE
Subscriber 1: GREEN
Subscriber 1: ORANGE
Subscriber 1: PURPLE
Subscriber 2: BLUE
Subscriber 2: GREEN
Subscriber 2: ORANGE
Subscriber 2: PURPLE
然后再看一个“热”发布者的例子:
@Test
public void testHotSequence() {
UnicastProcessor<String> hotSource = UnicastProcessor.*create*();
Flux<String> hotFlux = hotSource.publish()
.autoConnect()
.map(String::toUpperCase);
hotFlux.subscribe(d -> System.*out*.println("Subscriber 1 to Hot Source: " + d));
hotSource.onNext("blue");
hotSource.onNext("green");
hotFlux.subscribe(d -> System.*out*.println("Subscriber 2 to Hot Source: " + d));
hotSource.onNext("orange");
hotSource.onNext("purple");
hotSource.onComplete();
}
这个热发布者是一个UnicastProcessor,我们可以使用它的onNext等方法手动发出元素。上边的例子中,hotSource发出两个元素后第二个订阅者才开始订阅,所以第二个订阅者只能收到之后的元素:
Subscriber 1 to Hot Source: BLUE
Subscriber 1 to Hot Source: GREEN
Subscriber 1 to Hot Source: ORANGE
Subscriber 2 to Hot Source: ORANGE
Subscriber 1 to Hot Source: PURPLE
Subscriber 2 to Hot Source: PURPLE
由此可见,UnicastProcessor是一个热发布者。
另一个示例:
@Test
public void codeTest() throws InterruptedException {
//getNameByPhoney方法执行两次
Mono<String> customerPhoneMono = Mono.fromSupplier(
() -> ColdMonoHttp.getNameByPhone("18611854542")
);
//getNameByPhoney方法执行一次
Mono<String> customerPhoneMono = Mono.fromFuture(
CompletableFuture.supplyAsync(() -> ColdMonoHttp.getNameByPhone("18611854542"))
);
customerPhoneMono.subscribe(a -> System.out.println("-----订阅一:name=" + a));
customerPhoneMono.subscribe(a -> System.out.println("-----订阅二:name=" + a));
Thread.sleep(10000);
}
public static String getNameByPhone(String phone) {
try {
System.out.println("get name start:" + phone);
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
System.out.println("get name end:" + phone);
return "zhangsan";
}
参考:
官网:
GitHub - reactor/reactor-core: Non-Blocking Reactive Foundation for the JVM
Reactor 3 Reference Guide
GitHub & BitBucket HTML Preview
其它:
使用 Reactor 进行反应式编程
反应式编程介绍 - liudongdong_jlu - CSDN博客
什么是 Reactor? - Reactor 指南中文版 - 极客学院Wiki
Reactor 入门与实践 - Go语言中文网 - Golang中文社区
网友评论