美文网首页
Reactor学习:二、生产者

Reactor学习:二、生产者

作者: 睦月MTK | 来源:发表于2020-06-30 16:53 被阅读0次

    声明:


    一、Flux和Mono

    Flux<T>Mono<T>Reactor中非常重要的两个生产者,它们都继承了Publisher<T>FluxMono都代表一个响应式序列,但是不同的是,Flux代表0—N个元素,Mono代表0—1个元素。其实称Mono为一个序列并不准确,称之为“结果”可能会更好。试想web中的应用场景,一个请求必然只生成一个响应,所以使用Mono<HttpResponse>比使用Flux<HttpResponse>语义上更为确切。


    二、生产序列的方法
    • Flux#just(T...)Mono#just(T)Mono#justOrEmpty(T)
      just方法就是显式指定序列,类似Stream#of
    • Flux#from(Publisher)Flux#fromArrayFlux#fromIterableFlux#rangeFlux#fromStream(Supplier<Stream>)
      从Publisher、数组、集合、数据范围、流中生产序列
    • Mono#from(Publisher)Mono#fromSupplierMono#fromRunnableMono#fromCallableMono#fromFuture
      从其他地方获取结果,其中Mono#from(Publisher)是截取Publisher第一个元素
    • emptyerrornever
      1)empty指的是一个直接完成的序列
      2)error指的是一个直接报错的序列
      3)never值的是一个不做任何事情的序列,比如发出数据、报告完成、报告错误等等
    • Flux#generate
      Flux#generate是一个同步的简易的程序化生成序列的方法,你需要提供一个初始值以及一个控制sink的回调方法,其完整参数序列是generate(Callable<S> stateSupplier, BiFunction<S, SynchronousSink<T>, S> generator, Consumer<? super S> stateConsumer)
      多说无益,看段代码就懂了
    Flux<String> stringFlux = Flux.generate(() -> 0 , (state,sink) -> {
        sink.next("str"+state);
        //sink.next("str"+state);
        LockSupport.parkNanos(Duration.ofSeconds(1).toNanos());
        if(state++ == 10) sink.complete();
        return state;
    },System.out::println);
    

    这段代码的作用生成一个从"str0"一直到"str10"的序列,且每间隔一秒生成一个。在sink#complete方法被调用之前或者出现异常之前,该回调方法(即第二个参数BiFunction那一段)会被一直调用。需要注意的是sink#next在一次回调中只能被调用一次,否则将会报错,导致序列生成的中止。第三个参数可以不要,其作用是在中止时(包括异常带来的中止或者Disposable#dispose取消任务带来的中止)将当前状态(也就是state)进行一定的操作,代码中是直接将其打印了出来

    • Flux#create
      Flux#create比之Flux#generate要高级一点,其并不需要一个初始值,而且它支持多线程同时触发回调,因此,与监听机制结合在一起会得到意想不到的效果。
    Flux<String> stringFlux = Flux.create(sink -> {
        //onRequest在每次接收到request的时候都会被调用
        sink.onRequest(n -> {
            System.out.println("on request:"+n);
        }).onCancel(() -> {
            System.out.println("cancel");
            //onDispose在完成、错误、取消时候调用,取消的优先级低于onCancel
        }).onDispose(() -> {
            System.out.println("dispose");
        });
        myEventContext.addMyEventListener(new SinkEventListener() {
            @Override
            public void onCompleteEvent(SinkCompleteEvent event) {
                sink.complete();
            }
    
            @Override
            public void onNextEvent(SinkNextEvent event) {
                sink.next(event.getMessage());
            }
        });
    }, FluxSink.OverflowStrategy.DROP);
    

    在这段代码中每当有一个SinkNextEvent事件触发时,就会执行一次sink#next,当触发了onCompleteEvent事件时,就会执行sink#complete。第二个参数FluxSink.OverflowStrategy.xxx指定了背压(backpressure,即下游对上游的反馈控制,避免爆发“洪水”)的参数,背压参数有下列几种:
    IGNORE 忽视反馈,我行我素,可能会抛出IllegalStateException
    ERROR 超出下游接收能力时,抛出IllegalStateException
    DROP 超出下游接收能力时(下游不准备接收时),丢弃掉上游数据
    LATEST 不抛弃掉上游数据,但是只取最近的数据
    BUFFER 默认值,缓存所有的超量数据,要注意超出内存限制的风险
    注意:不论选择哪个,在下游第一次发出请求之前,上游的所有数据均会被丢弃。
    额外需要注意的是,在sink#complete发生之后,如果还在使用sink#next来添加数据,该数据会被自动丢弃,并打印出类似[DEBUG] (pool-1-thread-1) onNextDropped: 0.7328237206959655的debug信息

    • handle
      该方法存在于Flux以及Mono中,比较特殊,相当于一个过滤机制,它可以承接上一个序列生成方法,将其生成的数据进行过滤后再输出,它使用的sinkSynchronousSink这表示sink#next在一次回调中只能被执行一次。具体示例代码如下:
    Flux<Long> longFlux = Flux.interval(Duration.ofSeconds(1)).handle((item , sink) -> {
        if(item % 2 != 0) sink.next(item);
    });
    

    该段代码会生成1、3、5...的奇数序列,每隔2秒生成一次。item指的是上游发出的数据


    参考文档:
    [1] Reactor api doc
    [2] Reactor reference doc

    相关文章

      网友评论

          本文标题:Reactor学习:二、生产者

          本文链接:https://www.haomeiwen.com/subject/tigtqktx.html