在执行程序时,为了提供性能,处理器和编译器常常会对指令进行重排序。重排序分为编译器重排序和处理器重排序两种。
• 编译器重排序:编译器保证不改变单线程执行结果的前提下,可以调整多线程语句执行顺序。
• 处理器重排序:如果不存在数据依赖性,处理器可以改变语句对应机器指令的执行顺序。
将同步方式改成异步方式,方法执行改成消息发送,因此诞生了响应式编程模型。
根据CQRS(Command Query Responsibility Segregation,命令查询的责任分离)模式的思想,任何业务都可以分解为两种基本的消息形式,Query和Command。
响应式编程的设计原则如下
• 保持数据的不变性。
• 没有共享。
• 阻塞是有害的。
Reactor
Reactor是第四代Reactive库,基于Reactive Streams规范在JVM上构建非阻塞应用程序。
Reactor侧重于服务器端响应式编程,是一个基于Java 8实现的响应式流规范(ReactiveStreams specification)响应式库。
在Reactor中,数据流发布者(Publisher)由Flux和Mono两个类表示,它们都提供了丰富的操作符(operator)。
Flux & Mono
Flux 包含0个或者多个元素[0..n]
Mono 包含0个或1个元素[0..1]
Flux & Mono都包含三种信号:元素值
、错误信号
、完成信号
(三者不一定都具备)
image.png
多种声明方式
image.png
subscribed
对数据流订阅
public static void main(String[] args) {
// 测试Flux
Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print);
System.out.println("\n--------------测试Flux--------------");
// 测试Mono
Mono.just(1).subscribe(System.out::println);
System.out.println("--------------测试Mono--------------");
// 测试2个参数的subscribe方法
Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print, System.err::println);
System.out.println("\n--------------测试2个参数的subscribe方法--------------");
// 测试3个参数的subscribe方法
Flux.just(1, 2, 3, 4, 5, 6).subscribe(
System.out::print,
System.err::println,
() -> System.out.println("\ncomplete"));
System.out.println("--------------测试3个参数的subscribe方法--------------");
// 测试4个参数的subscribe方法
Flux.just('A','B','C').subscribe(
System.out::print,
// 有错误时触发
System.err::println,
() -> System.out.println("\ncomplete"),
subscription -> {
// 订阅开始时触发
System.out.println("订阅发生了");
// 订阅长度,request(1)则只会打出A
subscription.request(1);
});
System.out.println("--------------测试4个参数的subscribe方法--------------");
}
测试复杂度提高了,不像传统写法,可以单步执行。
使用StepVerifier
做测试
// run
StepVerifier.create(Flux.just('a','b','c','d'))
.expectNext('a','b','c')
.expectComplete()
.verify();
// result
Exception in thread "main" java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext(d))
// run
StepVerifier.create(Flux.just('a','b','c','d'))
.expectNext('a','b','3','d')
.expectComplete()
.verify();
// result
Exception in thread "main" java.lang.AssertionError: expectation "expectNext(3)" failed (expected value: 3; actual value: c)
// run
StepVerifier.create(Flux.just('a','b','c','d'))
.expectNext('a','b','c','d')
.expectComplete()
.verify();
// result 成功,无结果输出
StepVerifier API
Operator操作符
-
map
map可以将数据元素转换成映射表,得到一个新的元素。
image.png
// run
Flux.range(1,3).map(i -> i*i*i).subscribe(
i -> {
System.out.print(i);
System.out.print(',');
}
);
// result
1,8,27,
-
flatMap
flatMap操作可以将每个数据元素转换/映射为各个流,然后将每个流合并为一个大的数据流。
image.png
// run
Flux.just("a-b-c","1-2-3")
.flatMap(s -> Flux.fromArray(s.split("-")))
.subscribe(
System.out::print
);
// result
abc123
-
filter
filter操作可以对数据元素过滤,得到剩余的元素。
image.png
// run
Flux.range(1,3)
.filter(i -> i!=2)
.subscribe(
System.out::println
);
// result
1
3
-
zip
zip能够将多个流一对一的合并起来。
image.png
zip的其中一种用法
image.png
// run
Flux.zip(Flux.fromArray(desc.split("\\s+"))
, Flux.just('a','b','c','d'))
.subscribe(System.out::print);
// result
[I,a][am,b][Reactor,c]
线程模型
JDK提供的多线程工具类Executors提供了多种线程池,使开发人员可以方便地定义线程池进行多线程开发。
• 获取当前线程环境Schedulers.immediate()。
• 获取可重用的单线程环境Schedulers.single()。
• 获取弹性线程池环境Schedulers.elastic()。
• 获取固定大小线程池环境Schedulers.parallel()。
• 获取自定义线程池环境Schedulers.fromExecutorService(ExecutorService) 。
private static void helloAsync() {
// Callable调用同步hello方法
Mono.fromCallable(SchedulerOperationDemo::hello)
// 弹性线程池执行
.subscribeOn(Schedulers.elastic())
// 打印结果
.subscribe(System.out::println, System.err::println);
}
网友评论