美文网首页Spring响应式
零基础学习WebFlux(持续更新中。。。)

零基础学习WebFlux(持续更新中。。。)

作者: liuliuzo | 来源:发表于2020-09-01 13:51 被阅读0次

前言

我们在学习WebFlux之前需要先学习JVM平台上的响应式流(Reactive Streams)规范。响应式流是一个倡议,用来为具有非阻塞后压的异步流处理提供一个标准。大家努力的目标集中在运行时环境(JVM和JavaScript)和网络协议上。响应式流其实就是一个规范,且这个规范已经被引入到JDK9里了。

API由以下组件组成,响应式流的实现必须提供它们:

  • Publisher,发布者(生产者)
  • Subscriber,订阅者(消费者)
  • Subscription,订阅
  • Processor,处理者

它们其实是4个接口,先睹为快:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
public interface Subscription {
    public void request(long n);
    public void cancel();
}
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}

一个发布者是一个潜在的无限数量的序列元素的一个提供者,按照收到的来自于它的订阅者的需要来发布这些元素。

作为对发布者的subscribe(Subscriber)方法调用的响应,对于订阅者上的方法的可能调用顺序按下面的协议给出:

onSubscribe onNext* (onError | onComplete)?

这意味着onSubscribe方法总是被调用,后面跟着一个可能的无限数量onNext方法调用(因为订阅者的请求)。如果失败的话,后跟一个onError方法调用,或当没有更多的元素可用时,是一个onComplete方法调用,只要这个Subscription(订阅关系)没有被取消。

说明

Publisher
public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}
  • 一个发布者对一个订阅者的onNext调用总次数必须总是小于或等于订阅者的Subscription请求的元素总数。
  • 一个发布者可能调用的onNext次数比要求的少,然后通过调用onComplete或onError来终止Subscription。
  • 对一个订阅者的onSubscribe,onNext,onError和onComplete调用必须以一个线程安全的方式进行,如果被多个线程执行,使用external synchronization。
  • 如果一个发布者失败,它必须调用一个onError。
  • 如果一个发布者成功地终止(对于有限流),它必须调用一个onComplete。
  • 如果一个发布者调用一个订阅者上的onError或onComplete方法,那个订阅者的Subscription必须认为已被取消。
  • 一旦一个terminal state已经被调用(onError,onComplete),它要求没有进一步的调用发生。
  • 如果一个订阅被取消,它的订阅者必须最终停止被调用。
  • 发布者的subscribe方法里必须在早于对订阅者上的任何方法调用之前先调用onSubscribe方法,且必须return normally。当订阅者是null的时候,此时必须向调用者抛出java.lang.NullPointerException异常。对于其它任何情况,通知失败(或拒绝订阅者)的唯一合法方式是调用onError。
  • 发布者的subscribe方法可能被调用任意多次,但是每次必须使用一个不同的订阅者。
  • 一个发布者可以支持多个订阅者,并决定每一个订阅是单播或多播。
Subscriber
public interface Subscriber<T> {
    public void onSubscribe(Subscription s);
    public void onNext(T t);
    public void onError(Throwable t);
    public void onComplete();
}
  • 一个订阅者必须通过订阅(Subscription)的request(long n)方法声明需求,然后接收onNext调用。
  • 如果一个订阅者怀疑它的调用处理将消极地影响它的发布者的响应度,建议异步地分发它的调用。
  • 订阅者的onComplete()和onError(Throwable t)这两个方法里禁止调用订阅或发布者上的任何方法。
  • 订阅者的onComplete()和onError(Throwable t)这两个方法在接收到调用后必须认为订阅已经被取消。
  • 一个订阅者必须在收到onSubscriber之后调用指定订阅上的cancel()方法取消该订阅,如果它已经有一个活动的订阅。
  • 一个订阅者必须调用订阅的cancel()方法,如果这个订阅不再需要的话。
  • 一个订阅者必须确保所有对订阅发生的调用都来自于同一个线程或为它们各自提供external synchronization。
  • 一个订阅者必须准备好接收一到多个onNext调用,在已经调用过订阅的cancel()方法之后如果还有请求的元素即将发送。订阅的cancel()方法并不保证立即执行底层的清理操作。
  • 一个订阅者必须准备好接收一个onComplete调用,不管之前有或没有调用过订阅的request(long n)方法。
  • 一个订阅者必须准备好接收一个onError调用,不管之前有或没有调用过订阅的request(long n)方法。
  • 一个订阅者必须确保它的所有的方法调用发生在它们各自的处理之前。该订阅者必须小心合适地发布调用到它的处理逻辑。
  • 订阅者的onSubscribe方法必须最多被调用一次,对于一个给定的订阅者。
  • 对onSubscribe,onNext,onError或onComplete的调用必须return normally,除了当提供的任何参数是null时,这种情况必须向调用者抛出一个java.lang.NullPointerException异常。对于其它情况,对于一个订阅者来说,去通知一个失败的唯一合法的方式是取消它的订阅。在这个规则被违反的情况下,任何与该订阅者关联的订阅必须被认为是取消的,调用者必须激发这个错误条件,以一种对于运行环境来说是足够的方式。
Subscription
public interface Subscription {
    public void request(long n);
    public void cancel();
}
  • 订阅的request和cancel方法必须在它的订阅者上下文里被调用。
  • 订阅必须允许订阅者在onNext或onComplete方法里同步地调用订阅的request方法。
  • 订阅的request方法必须放置一个关于发布者和订阅者间的同步递归调用的上界。
  • 订阅的request方法应该尊重它的调用者的响应度,通过以一个适时的方式返回。
  • 调用的cancel方法必须尊重它的调用者的响应度,通过以一个适时的方式返回,必须是幂等的和线程安全的。
  • 在订阅取消之后,额外的request(long n)调用必须是NOP。
  • 在订阅取消之后,额外的cancel()调用必须时NOP。
  • 当订阅没有被取消时,request(long n)方法必须注册给定数目的额外元素,这些元素将被生产并分发给各自的订阅者。
  • 当订阅没有被取消时,request(long n)必须使用一个java.lang.IllegalArgumentException异常来调用onError,如果参数小于等于0。引起的原因应该解释为不是正数的调用是非法的。
  • 当订阅没有被取消,request(long n)可以同步地调用这个(或其它)订阅者上的onNext。
  • 当订阅没有被取消,request(long n)可以同步地调用这个(或其它)订阅者上的onComplete或onError。
  • 当订阅没有被取消,cancel()必须请求发布者最终停止调用它的订阅者上的方法。这个操作不要求立即影响订阅。
  • 当订阅没有被取消,cancel()必须请求发布者最终删除对相关订阅者的任何引用。
  • 当订阅没有被取消,调用cancel()可以引起发布者(如果是有状态的)进入关闭状态,如果在此刻没有其它的订阅存在。
  • 调用订阅的cancel方法必须是return normally。
  • 调用订阅的request方法必须是return normally。

一个订阅必须支持无数次地调用request方法,必须支持到2^63 - 1(Long.MAX_VALUE)次。如果一个需求等于或大于2^63 - 1(Long.MAX_VALUE),或许被发布者认为是真正的无界。
一个订阅被一个发布者和一个订阅者共享,目的是为了在它们之间调节数据交换。这也是为什么subscribe()方法并没有返回创建的那个订阅而是返回void的原因。这个订阅只能通过onSubscriber回调方法传给订阅者。

Processor
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
  • 一个处理器表示一个处理阶段,它既是一个订阅者又是一个发布者,必须遵守它们两者的契约。
  • 一个处理器可以选择恢复一个onError调用。如果它选择这样做,必须认为订阅被取消,否则必须立即传播onError调用到它的订阅者。
    在不被强制时,当最后一个订阅者取消它的订阅时,取消一个处理器的上游订阅是一个好主意,可以让这个取消调用往上游传播。

WebFlux介绍

Spring官方为了让我们更加快速/平滑到WebFlux上,之前SpringMVC那套都是支持的。也就是说:我们可以像使用SpringMVC一样使用着WebFlux。

WebFlux使用的响应式流并不是用JDK9平台的,而是一个叫做Reactor响应式流库。所以,入门WebFlux其实更多是了解怎么使用Reactor的API,Spring5的已经支持Reactor模型,主要提供开发者使用的是Mono和Flux,在Reator遵循ReactiveStreams的标准的API的现,Rxjava是在Reactor之前实现反应流式库,Spring 5实现了在Reactor-Netty基础上实现SpringMVC的框架命名为Spring Webflux,这是取代传统的servlet API的异步框架。

Reactor库

接着在介绍WebFlux之前我们再了解下Reactor库

  • 响应式编程操作中,Reactor 是满足 Reactive 规范框架
  • Reactor 有两个核心类,Mono 和 和 Flux ,这两个类实现接口 Publisher,提供丰富操作 ,提供丰富操作符。Flux 对象实现发布者,返回 N 个元素;Mono 实现发布者,返回 0 或者 1 个元素
  • Flux和Mono都是Publisher<T>在Reactor 3实现。Publisher<T>提供了subscribe方法,允许消费者在有结果可用时进行消费。如果没有消费者Publisher<T>不会做任何事情,他根据消费情况进行响应。 Publisher<T>可能返回零或者多个,甚至可能是无限的,为了更加清晰表示期待的结果就引入了两个实现模型Mono和Flux。
  • 使用 Flux 和 Mono 都可以发出三种数据信号:元素值,错误信号,完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者:
    1. 错误信号和完成信号都是终止信号,不能共存的
    2. 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
    3. 如果没有错误信号,没有完成信号,表示是无限数据流
  • 调用 just 或者其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的

代码演示 Flux 和 和 Mono

<dependency>
    <groupId>io.projectreactor</groupId>
    <artifactId>reactor-core</artifactId>
    <version>3.1.5.RELEASE</version>
< dependency>
public class ReactorTest {
    //just声明元素
    public static void main(String[] args) {
        Flux.just(1, 2, 3, 4, 5, 6);
        Mono.just(1);
        //其他方法
        Integer[] a = {1, 2, 3, 4, 5, 6};
        Flux.fromArray(a);
        List<Integer> list = Arrays.asList(a);
        Flux.fromIterable(list);
        Stream<Integer> stream = list.stream();
        Flux.fromStream(stream);
    }
}
反应式数据处理
public Mono<ClientUser> currentUser () {
    return isAuthenticated ? Mono.just(new ClientUser("felord.cn", "reactive"))
            : Mono.empty();
}

Mono defer方法创建数据源属于懒汉型,与Mono.just等创建数据源则是恶汉型,我们在看下下面这个例子:

Mono<Date> m1 = Mono.just(new Date());
Mono<Date> m2 = Mono.defer(()-> Mono.just(new Date()));
m1.subscribe(System.out::println);
m2.subscribe(System.out::println);
try {
    Thread.sleep(5000);
} catch (InterruptedException e) {
    e.printStackTrace();
}
m1.subscribe(System.out::println);
m2.subscribe(System.out::println);
Tue Dec 08 09:18:48 CST 2020
Tue Dec 08 09:18:49 CST 2020
Tue Dec 08 09:18:48 CST 2020
Tue Dec 08 09:18:54 CST 2020

我们可以看到,创建了两个数据源,一个使用Mono.just创建,一个用Mono.defer创建,然后分别通过lambda表达式订阅这两个publisher,可以看到两个输出的时间是一样的,延迟5秒钟后重新订阅,Mono.just创建的数据源时间没变,但是Mono.defer创建的数据源时间相应的延迟了5秒钟,原因在于Mono.just会在声明阶段构造Date对象,只创建一次,但是Mono.defer却是在subscribe阶段才会创建对应的Date对象,每次调用subscribe方法都会创建Date对象,在webflux中:

@Override
public Mono<Void> filter(ServerWebExchange exchange) {
    return Mono.defer(() -> 
this.currentFilter != null && this.next != null ? 
this.currentFilter.filter(exchange, this.next) : this.handler.handle(exchange));
}

WebFlux的执行过程

reactor.netty.http.server.HttpServerHandle#onStateChange
->ReactorHttpHandlerAdapter.apply    //封装Request,Response。ReactorServerHttpRequest
-> HttpWebHandlerAdapter.handle      //创建ServerWebExchange
-> ExceptionHandlingWebHandler.handle
-> FilteringWebHandler.handle
-> DefaultWebFilterChain.handle      //使用WebFilter在请求处理前进行filter
-> DispatcherHandler.handle          //与Spring MVC对应,HandlerMapping,HandlerAdpater
-> handlerAdapter.handle
-> 调用我们的业务代码

样例

我们测试下webflux的线程执行情况,为什么webflux会有更高的性能。

@RestController
@RequestMapping("/hello/")
public class HelloController {

    private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(HelloController.class);

    @GetMapping("/common")
    public String commonHandle() {
        String uuid = UUID.randomUUID().toString();
        log.info("common-start"+":"+uuid);
        // 执行耗时操作
        String result = doThing("common handler");
        log.info("common-end"+":"+uuid);
        return result;
    }

    @GetMapping("/mono")
    public Mono<String> monoHandle() {
        String uuid = UUID.randomUUID().toString();
        log.info("mono-start"+":"+uuid);
        // 执行耗时操作
        Mono<String> mono = Mono.fromSupplier(() -> doThing("mono handle"));
        log.info("mono-end"+":"+uuid);
        // Mono表示包含0或1个元素的异步序列
        return mono;
    }

    // 定义耗时操作
    private String doThing(String msg) {
        try {
            TimeUnit.SECONDS.sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return msg;
    }
}

日志

2020-12-01 11:37:25.286 [reactor-http-nio-4] INFO  com.liuliu.webflux.learning.demo06.HelloController - mono-start: 631e4b65-1c0b-4064-afc0-d9741e7af269
2020-12-01 11:37:25.287 [reactor-http-nio-4] INFO  com.liuliu.webflux.learning.demo06.HelloController - mono-end:   631e4b65-1c0b-4064-afc0-d9741e7af269
2020-12-01 11:37:35.291 [reactor-http-nio-4] DEBUG org.springframework.core.codec.CharSequenceEncoder - [e6c15c8c] Writing "mono handle:631e4b65-1c0b-4064-afc0-d9741e7af269"
2020-12-01 11:37:35.292 [reactor-http-nio-4] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0xe6c15c8c, L:/127.0.0.1:8080 - R:/127.0.0.1:62228] Writing object DefaultHttpResponse(decodeResult: success, version: HTTP/1.1)

2020-12-01 11:37:22.693 [reactor-http-nio-2] INFO  com.liuliu.webflux.learning.demo06.HelloController - common-start:   3c52e08f-f88b-4bb9-904d-908e9f8235cf
2020-12-01 11:37:32.694 [reactor-http-nio-2] INFO  com.liuliu.webflux.learning.demo06.HelloController - common-end:     3c52e08f-f88b-4bb9-904d-908e9f8235cf
2020-12-01 11:37:32.701 [reactor-http-nio-2] DEBUG org.springframework.core.codec.CharSequenceEncoder - [c4438344] Writing "common handler:3c52e08f-f88b-4bb9-904d-908e9f8235cf"
2020-12-01 11:37:32.706 [reactor-http-nio-2] DEBUG reactor.netty.channel.ChannelOperationsHandler - [id: 0xc4438344, L:/127.0.0.1:8080 - R:/127.0.0.1:62226] Writing object DefaultHttpResponse(decodeResult: success, version: HTTP/1.

从日志里可以看出来,reactor-http-nio-4 并不会阻塞,所以大大的增加了吞吐量。

转载自一下文章:
学习代码
Spring5新特性WebFlux学习
Webflux的执行流程和核心API
外行人都能看得懂的WebFlux,错过了血亏
JVM平台上的响应式流(Reactive Streams)规范

相关文章

网友评论

    本文标题:零基础学习WebFlux(持续更新中。。。)

    本文链接:https://www.haomeiwen.com/subject/kxlvsktx.html