美文网首页
Spring5新特性:Webflux响应式编程

Spring5新特性:Webflux响应式编程

作者: 圆企鹅i | 来源:发表于2023-04-20 15:44 被阅读0次

    前言

    之前看过一本《RxJava反应式编程》,然后就对响应式编程产生了浓厚的兴趣。
    发现在Android应用开发中被广泛普及。
    然后在网上众说纷纭。很多人表示学习成本高,收益少。还表示同事RxJava水平参差不齐,导致代码变成屎山。
    但是在Spring开始接手之后,希望能够改变这一现象。

    image.png

    优势在于可以做到低延迟,高吞吐,弹性和消息驱动。
    SpringWebflux 基于 Reactor,默认使用容器是 Netty,Netty 是高性能的 NIO 框架,异步非阻
    塞的框架
    但是大家真的不会去选择并发性能更好的go吗,毕竟协程不是闹着玩的。停留在JDK8的Java是没有虚拟线程的,好再还有Netty。

    image.png

    怎么优化同步阻塞模型?
    这是Webflux的核心,在我们的理想情况下,肯定希望的模式是这样:
    1.前端发送请求
    2.后端的web服务使用一个线程把逻辑处理,发送给数据库CRUD操作,线程释放
    3.数据库收到请求,处理完,主动告诉web服务器
    4.web服务器使用一个线程把结果推到前端,线程释放

    因为通常我们在第二步,数据的crud上,是非常耗时间的,同时会让一个线程堵塞住,浪费时间。因此要优化这种模型,数据库也需要有异步的能力

    Postgres, MSSQL, H2,MySQL都开始陆续支持异步读取,并有了对应的实现R2DBC的驱动。

    这也就是响应式编程优秀的地方。

    观察者模式

    jdk8其实也提供了观察者模式,刚好前段时间写了。
    核心就是先指定好遇见某个事件需要触发的逻辑,然后再去等事件发生来触发他。解耦效果很好。

    @Test
        public void jdkObs() {
            //todo demo1
            CustomObservable customObservable = new CustomObservable();
    
            //将观察者1放入集合中 订阅1
            customObservable.addObserver((ob, arg) -> {
                log.info("Custom consume data ...");
                CustomObservable boa = (CustomObservable) ob;
                System.out.println("lambda 接收到了:" + arg);
            });
    
            //将观察者2放入集合中 订阅2
            customObservable.addObserver(new CustomObserver());
    
            //告诉观察者 可以执行一次
            customObservable.setChanged();
            //发送事件 集合中订阅的对象们 挨个处理订阅的信息
            customObservable.notifyObservers(Arrays.asList(1, 2, 3, 4, 5));
    
            //告诉观察者 可以执行一次 没有setChanged则无法处理消息
            customObservable.setChanged();
            customObservable.notifyObservers(Arrays.asList(2, 5));
    
            //todo 无法读取
            customObservable.notifyObservers(Arrays.asList(1, 3));
    
        }
    

    响应式编程(Reactor 实现)

    (1)响应式编程操作中,Reactor 是满足 Reactive 规范框架
    (2)Reactor 有两个核心类,Mono 和 Flux,这两个类实现接口 Publisher,提供丰富操作
    符。Flux 对象实现发布者,返回 N 个元素;Mono 实现发布者,返回 0 或者 1 个元素
    (3)Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:
    1.元素值
    2.错误信号:终止信号,流结束,同时把错误信号传给订阅者。
    3.完成信号:终止信号,流结束

    依赖

    <dependency>
     <groupId>io.projectreactor</groupId>
     <artifactId>reactor-core</artifactId>
     <version>3.1.5.RELEASE</version>
    </dependency>
    

    代码

    开启Flux流

    public static void main(String[] args) {
     //just 方法直接声明 开启一个流,数据流并没有发出,只有进行订阅之后才会触发
     Flux.just(1,2,3,4);
     Mono.just(1);
     //其他的方法
     Integer[] array = {1,2,3,4};
     Flux.fromArray(array);
    
     List<Integer> list = Arrays.asList(array);
     Flux.fromIterable(list);
     Stream<Integer> stream = list.stream();
     Flux.fromStream(stream);
    }
    

    订阅消费流的逻辑

    //源码
    public abstract class Flux<T> implements Publisher<T> {
    ...
        //flux订阅消费逻辑 非空的消费者即可
        public final Disposable subscribe(Consumer<? super T> consumer) {
            Objects.requireNonNull(consumer, "consumer");
            return this.subscribe(consumer, (Consumer)null, (Runnable)null);
        }
    }
    
        //应用编码
        @Test
        public void fluxDemo() {
            //just(T... data) 
            Flux.just("Hello", "World")
                    .map(s->s)
                    .distinct()
                    .filter(s->true)
                    //订阅一个打印逻辑
                    .subscribe(System.out::println);
            //array
            Flux.fromArray(new Integer[]{1, 2, 3})
                    .subscribe(System.out::println);
            //empty
            Flux.empty()
                    .subscribe(System.out::println);
            //range
            Flux.range(1, 10)
                    .subscribe(System.out::println);
            //interval(Duration period) 
            Flux.interval(Duration.of(10, ChronoUnit.SECONDS))
                    .subscribe(System.out::println);
        }
    
    

    工程化编码

    SpringWebflux 实现方式有两种:注解编程模型和函数式编程模型
    使用注解编程模型方式,和之前 SpringMVC 使用相似的,只需要把相关依赖配置到项目中,
    SpringBoot 自动配置相关运行容器,默认情况下使用 Netty 服务器

    Controller

    @RestController
    public class UserController {
     //注入 service
     @Autowired
     private UserService userService;
     //id 查询
     @GetMapping("/user/{id}")
     public Mono<User> geetUserId(@PathVariable int id) {
     return userService.getUserById(id);
     }
     //查询所有
     @GetMapping("/user")
     public Flux<User> getUsers() {
     return userService.getAllUser();
     }
     //添加
     @PostMapping("/saveuser")
     public Mono<Void> saveUser(@RequestBody User user) {
     Mono<User> userMono = Mono.just(user);
     return userService.saveUserInfo(userMono);
     }
    }
    

    其实不难发现,与SpringMVC的代码其实差距并不大。
    SpringMVC 方式实现,同步阻塞的方式,基于 SpringMVC+Servlet+Tomcat
    SpringWebflux 方式实现,异步非阻塞 方式,基于 SpringWebflux+Reactor+Netty

    Spring官方文档框架区别示意图

    Handler

    /**
     * @author zhangxuecheng4441
     * @date 2022/11/14/014 15:08
     */
    public class UserHandler {
        private final UserService userService;
    
        public UserHandler(UserService userService) {
            this.userService = userService;
        }
    
        //根据 id 查询
        public Mono<ServerResponse> getUserById(ServerRequest request) {
            //获取 id 值
            int userId = Integer.valueOf(request.pathVariable("id"));
            //空值处理
            Mono<ServerResponse> notFound = ServerResponse.notFound().build();
            //调用 service 方法得到数据
            Mono<User> userMono = this.userService.getUserById(userId);
            //把 userMono 进行转换返回
            //使用 Reactor 操作符 flatMap
            return userMono.flatMap(person -> ServerResponse
                            .ok()
                            .contentType(MediaType.APPLICATION_JSON)
                            .body(fromObject(person))
                    )
                    .switchIfEmpty(notFound);
        }
    
        //查询所有
        public Mono<ServerResponse> getAllUsers() {
            //调用 service 得到结果
            Flux<User> users = this.userService.getAllUser();
            return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(users, User.class);
        }
    
        //添加
        public Mono<ServerResponse> saveUser(ServerRequest request) {
            //得到 user 对象
            Mono<User> userMono = request.bodyToMono(User.class);
            return ServerResponse.ok().build(this.userService.saveUserInfo(userMono));
        }
    }
    

    相关文章

      网友评论

          本文标题:Spring5新特性:Webflux响应式编程

          本文链接:https://www.haomeiwen.com/subject/dlykwrtx.html