前言
之前看过一本《RxJava反应式编程》,然后就对响应式编程产生了浓厚的兴趣。
发现在Android应用开发中被广泛普及。
然后在网上众说纷纭。很多人表示学习成本高,收益少。还表示同事RxJava水平参差不齐,导致代码变成屎山。
但是在Spring开始接手之后,希望能够改变这一现象。
优势在于可以做到低延迟,高吞吐,弹性和消息驱动。
SpringWebflux 基于 Reactor,默认使用容器是 Netty,Netty 是高性能的 NIO 框架,异步非阻
塞的框架
但是大家真的不会去选择并发性能更好的go吗,毕竟协程不是闹着玩的。停留在JDK8的Java是没有虚拟线程的,好再还有Netty。
怎么优化同步阻塞模型?
这是Webflux的核心,在我们的理想情况下,肯定希望的模式是这样:
1.前端发送请求
2.后端的web服务使用一个线程把逻辑处理,发送给数据库CRUD操作,线程释放
3.数据库收到请求,处理完,主动告诉web服务器
4.web服务器使用一个线程把结果推到前端,线程释放
因为通常我们在第二步,数据的crud上,是非常耗时间的,同时会让一个线程堵塞住,浪费时间。因此要优化这种模型,数据库也需要有异步的能力。
Postgres, MSSQL, H2,MySQL都开始陆续支持异步读取,并有了对应的实现R2DBC的驱动。
这也就是响应式编程优秀的地方。
观察者模式
jdk8其实也提供了观察者模式,刚好前段时间写了。
核心就是先指定好遇见某个事件需要触发的逻辑,然后再去等事件发生来触发他。解耦效果很好。
@Test
public void jdkObs() {
//todo demo1
CustomObservable customObservable = new CustomObservable();
//将观察者1放入集合中 订阅1
customObservable.addObserver((ob, arg) -> {
log.info("Custom consume data ...");
CustomObservable boa = (CustomObservable) ob;
System.out.println("lambda 接收到了:" + arg);
});
//将观察者2放入集合中 订阅2
customObservable.addObserver(new CustomObserver());
//告诉观察者 可以执行一次
customObservable.setChanged();
//发送事件 集合中订阅的对象们 挨个处理订阅的信息
customObservable.notifyObservers(Arrays.asList(1, 2, 3, 4, 5));
//告诉观察者 可以执行一次 没有setChanged则无法处理消息
customObservable.setChanged();
customObservable.notifyObservers(Arrays.asList(2, 5));
//todo 无法读取
customObservable.notifyObservers(Arrays.asList(1, 3));
}
响应式编程(Reactor 实现)
(1)响应式编程操作中,Reactor 是满足 Reactive 规范框架
(2)Reactor 有两个核心类,Mono 和 Flux,这两个类实现接口 Publisher,提供丰富操作
符。Flux 对象实现发布者,返回 N 个元素;Mono 实现发布者,返回 0 或者 1 个元素
(3)Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:
1.元素值
2.错误信号:终止信号,流结束,同时把错误信号传给订阅者。
3.完成信号:终止信号,流结束
依赖
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.1.5.RELEASE</version>
</dependency>
代码
开启Flux流
public static void main(String[] args) {
//just 方法直接声明 开启一个流,数据流并没有发出,只有进行订阅之后才会触发
Flux.just(1,2,3,4);
Mono.just(1);
//其他的方法
Integer[] array = {1,2,3,4};
Flux.fromArray(array);
List<Integer> list = Arrays.asList(array);
Flux.fromIterable(list);
Stream<Integer> stream = list.stream();
Flux.fromStream(stream);
}
订阅消费流的逻辑
//源码
public abstract class Flux<T> implements Publisher<T> {
...
//flux订阅消费逻辑 非空的消费者即可
public final Disposable subscribe(Consumer<? super T> consumer) {
Objects.requireNonNull(consumer, "consumer");
return this.subscribe(consumer, (Consumer)null, (Runnable)null);
}
}
//应用编码
@Test
public void fluxDemo() {
//just(T... data)
Flux.just("Hello", "World")
.map(s->s)
.distinct()
.filter(s->true)
//订阅一个打印逻辑
.subscribe(System.out::println);
//array
Flux.fromArray(new Integer[]{1, 2, 3})
.subscribe(System.out::println);
//empty
Flux.empty()
.subscribe(System.out::println);
//range
Flux.range(1, 10)
.subscribe(System.out::println);
//interval(Duration period)
Flux.interval(Duration.of(10, ChronoUnit.SECONDS))
.subscribe(System.out::println);
}
工程化编码
SpringWebflux 实现方式有两种:注解编程模型和函数式编程模型
使用注解编程模型方式,和之前 SpringMVC 使用相似的,只需要把相关依赖配置到项目中,
SpringBoot 自动配置相关运行容器,默认情况下使用 Netty 服务器
Controller
@RestController
public class UserController {
//注入 service
@Autowired
private UserService userService;
//id 查询
@GetMapping("/user/{id}")
public Mono<User> geetUserId(@PathVariable int id) {
return userService.getUserById(id);
}
//查询所有
@GetMapping("/user")
public Flux<User> getUsers() {
return userService.getAllUser();
}
//添加
@PostMapping("/saveuser")
public Mono<Void> saveUser(@RequestBody User user) {
Mono<User> userMono = Mono.just(user);
return userService.saveUserInfo(userMono);
}
}
其实不难发现,与SpringMVC的代码其实差距并不大。
SpringMVC 方式实现,同步阻塞的方式,基于 SpringMVC+Servlet+Tomcat
SpringWebflux 方式实现,异步非阻塞 方式,基于 SpringWebflux+Reactor+Netty
Handler
/**
* @author zhangxuecheng4441
* @date 2022/11/14/014 15:08
*/
public class UserHandler {
private final UserService userService;
public UserHandler(UserService userService) {
this.userService = userService;
}
//根据 id 查询
public Mono<ServerResponse> getUserById(ServerRequest request) {
//获取 id 值
int userId = Integer.valueOf(request.pathVariable("id"));
//空值处理
Mono<ServerResponse> notFound = ServerResponse.notFound().build();
//调用 service 方法得到数据
Mono<User> userMono = this.userService.getUserById(userId);
//把 userMono 进行转换返回
//使用 Reactor 操作符 flatMap
return userMono.flatMap(person -> ServerResponse
.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(fromObject(person))
)
.switchIfEmpty(notFound);
}
//查询所有
public Mono<ServerResponse> getAllUsers() {
//调用 service 得到结果
Flux<User> users = this.userService.getAllUser();
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users, User.class);
}
//添加
public Mono<ServerResponse> saveUser(ServerRequest request) {
//得到 user 对象
Mono<User> userMono = request.bodyToMono(User.class);
return ServerResponse.ok().build(this.userService.saveUserInfo(userMono));
}
}
网友评论