Reactor

作者: spilledyear | 来源:发表于2019-05-12 17:29 被阅读0次

    Flux.just

    从以下两行简单代码入手

    Flux<String> fewWords = Flux.just("Hello", "World");
    fewWords.subscribe(System.out::println);
    

    Flux.subscribe是一个final方法,如下,最终入参consumer被封装成一个 LambdaSubscriber

    public final Disposable subscribe(Consumer<? super T> consumer) {
        Objects.requireNonNull(consumer, "consumer");
        return subscribe(consumer, null, null);
    }
    
    public final Disposable subscribe(
            @Nullable Consumer<? super T> consumer,
            @Nullable Consumer<? super Throwable> errorConsumer,
            @Nullable Runnable completeConsumer,
            @Nullable Consumer<? super Subscription> subscriptionConsumer) {
        return subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer,
                completeConsumer,
                subscriptionConsumer));
    }
    

    LambdaSubscriber继承关系


    内部流程

    1. Flux.just("Hello", "World") 返回一个 FluxArray 对象,两个参数被封装成一个数组,并作为 FluxArray 的属性 array

    2. fewWords.subscribe(System.out::println),入参 System.out::println 被封装成一个 LambdaSubscriber ,然后在 FluxArray.subscribe 方法中调用 LambdaSubscriber.onSubscribe 方法(以自己和array为参数,封装一个ArraySubscription对象作为onSubscribe方法的参数) ,即 LambdaSubscriber.onSubscribe(new ArraySubscription<>(LambdaSubscriber, array))

    3. LambdaSubscriber.onSubscribe 方法内部调用 ArraySubscription.request 方法(入参为Long.MAX_VALUE);

    4. ArraySubscription.request 方法内部调用 LambdaSubscriber.onNext/onError/onComplete 方法;

    5. LambdaSubscriber的onNext方法内部调用真正的逻辑;

    Flux.map

    Flux<String> fewWords = Flux.just("Hello", "World").map(v -> v.toUpperCase());
    fewWords.subscribe(System.out::println);
    

    内部流程

    1. Flux.just("Hello", "World") 返回一个 FluxArray 对象,两个参数被封装成一个数组,并作为 FluxArray 的属性 array

    2. Flux.map(v -> v.toUpperCase()) 返回一个 FluxMapFuseable 对象,创建 FluxMapFuseable 对象的时候以 FluxArrayFunction 作为参数,即 return onAssembly(new FluxMapFuseable<>(this, mapper)) //这里的this即代表 FluxArray

    3. fewWords.subscribe(System.out::println),即调用 FluxMapFuseable.subscribe 方法,在该方法内调用 source.subscribe 方法(这里的source 即指 FluxArray),此时会传入一个 MapFuseableSubscriber 对象,在创建 MapFuseableSubscriber 时候以 actual(消费者) 和 mapper(map对应的Function入参) 作为入参, 即 source.subscribe(new MapFuseableSubscriber<>(actual, mapper))

    4. FluxArray.subscribe 方法内部调用 MapFuseableSubscriber.onSubscribe 方法(入参为 ArraySubscription ,创建 ArraySubscription 的时候以 MapFuseableSubscriberarray(真实数据) 为入参),即 s.onSubscribe(new ArraySubscription<>(s, array)) //这里的s代表 MapFuseableSubscriber

    5. MapFuseableSubscriber.onSubscribe 方法内部, 赋值 入参 ArraySubscription 为自己的成员变量 s,然后以自己(MapFuseableSubscriber)为入参,调用 actual即LambdaSubscriber.onSubscribe 方法, 即 actual.onSubscribe(this)

    6. LambdaSubscriber.onSubscribe 方法内部调用 (入参)MapFuseableSubscriber.request(Long.MAX_VALUE) 方法;

    7. MapFuseableSubscriber.request 方法内部,调用成员变量s(s前面说过,即ArraySubscription) 即 ArraySubscription.request 方法;

    8. ArraySubscription.request 方法内部,调用成员变量actual的相关方法(onNext/onError/onComplete)。前面对 ArraySubscription 分析过,是在调用 FluxArray.subscribe 方法的时候创建的,创建的时候以 MapFuseableSubscriberarray(真实数据) 为入参,所以这里的 actual即代表MapFuseableSubscriber,所以这里也就是调用 MapFuseableSubscriber 的相关方法(onNext/onError/onComplete);

    9. MapFuseableSubscriber.onNext/onError/onComplete 方法内部,如果不是onNext方法,会直接调用成员变量actual(LambdaSubscriber).onError/onComplete 相关方法;如果是onNext方法,会先执行 mapper的相关逻辑,得到一个结果 v,然后再以 v 为入参,调用成员变量actual(LambdaSubscriber).onNext 方法;【调用成员变量actual(LambdaSubscriber)的相关方法也就是执行真正的消费者逻辑】

    Flux.create

    Flux.create(new Consumer<FluxSink<String>>() {
        @Override
        public void accept(FluxSink<String> fluxSink) {
            fluxSink.next("发送数据耶");
        }
    }).subscribe(System.out::println);
    
    FluxCreate

    内部流程

    1. Flux.create 方法发返回一个 FluxCreate 对象,即 return onAssembly(new FluxCreate<>(emitter, backpressure, FluxCreate.CreateMode.PUSH_PULL))

    2. FluxCreate.subscribe 方法中,先以 actual(消费者)被压策略 创建 BaseSink 对象,这里返回一个 BufferAsyncSink 对象;然后调用 actual.onSubscribe(sink) 方法(这里的sink就是BaseSink对象);然后再 调用 source.accept(createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :sink) 方法(这里的source指的是create方法中传入的Function,即 匿名内部类)

    public void subscribe(CoreSubscriber<? super T> actual) {
        // (1). 创建BaseSink对象,这里返回一个 BufferAsyncSink 对象
        BaseSink<T> sink = createSink(actual, backpressure);
        
        // (2). 调用消费者的onSubscribe方法
        actual.onSubscribe(sink);
    
        try {
            // (3). source指的是create方法中传入的 Function,即 匿名内部类
            source.accept(createMode == CreateMode.PUSH_PULL ? new SerializedSink<>(sink) :sink);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            sink.error(Operators.onOperatorError(ex, actual.currentContext()));
        }
    }
    
    1. actual.onSubscribe(sink) 方法中调用 sink即BufferAsyncSink .request 方法(BufferAsyncSink 继承了 BaseSink 对象,request 方法在 BaseSink 中);

    2. BaseSink.request 方法中调用子类 BufferAsyncSink.onRequestedFromDownstream 方法;

    3. BufferAsyncSink.onRequestedFromDownstream 方法中 调用 BufferAsyncSink.drain 方法;

    4. BufferAsyncSink.drain 方法中,会判断队列中是有有数据,如果有就会执行 actual.onNext 方法,因为此时队列中没有数据,所以会直接返回;

    5. 先以 BufferAsyncSink 为入参创建 SerializedSink 对象,然后以 SerializedSink 为入参,调用 Function(create方法的入参).accept 方法(即执行我们自己写的逻辑),然后调用 入参 SerializedSink.next 方法,即 fluxSink.next("发送数据耶")

    6. SerializedSink.next 方法中调用 BufferAsyncSink.next 方法;

    7. BufferAsyncSink.next 方法中,先将数据放入队列,然后再执行 BufferAsyncSink.drain 方法;注意,因为此时队列中有数据了,所以在接下来的 BufferAsyncSink.drain 方法调用中,会调用 actual.onNext(o) 方法(即执行我们自定义的消费者onNext方法逻辑);

    FluxSink

    相关文章

      网友评论

          本文标题:Reactor

          本文链接:https://www.haomeiwen.com/subject/ptzzoqtx.html