美文网首页
Project Reactor源码分析1-底层实现原理分析

Project Reactor源码分析1-底层实现原理分析

作者: 王侦 | 来源:发表于2023-03-13 15:27 被阅读0次
        Flux.just("tom", "jack", "allen")
                .map(s-> s.concat("@qq.com"))
                .filter(s -> s.length() >= 11)
                .subscribe(System.out::println);

结果:

jack@qq.com
allen@qq.com

1.声明阶段

Flux#just()

Flux#just()

  • Flux#fromArray(T[] array)
  • onAssembly(new FluxArray<>(array))
  • 最后返回的就是FluxArray<>(array)
final class FluxArray<T> extends Flux<T> implements Fuseable, SourceProducer<T> {

    final T[] array;

    @SafeVarargs
    public FluxArray(T... array) {
        this.array = Objects.requireNonNull(array, "array");
    }

Flux#map(Function mapper)

Flux#map(Function<? super T, ? extends V> mapper)

  • onAssembly(new FluxMapFuseable<>(this, mapper))
final class FluxMapFuseable<T, R> extends InternalFluxOperator<T, R> implements Fuseable {

    final Function<? super T, ? extends R> mapper;

    FluxMapFuseable(Flux<? extends T> source,
            Function<? super T, ? extends R> mapper) {
        super(source);
        this.mapper = Objects.requireNonNull(mapper, "mapper");
    }

Flux#filter

Flux#filter(Predicate<? super T> p)

  • onAssembly(new FluxFilterFuseable<>(this, p))

2.subscribe

2.1 subscribe阶段

Flux#subscribe(Consumer)

Flux#subscribe(Consumer)

  • subscribe(consumer, null, null)
  • subscribe(consumer, errorConsumer, completeConsumer, (Context) null)
  • subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer, completeConsumer, null, initialContext))
  • subscribe(Subscriber<? super T> actual)
    public final void subscribe(Subscriber<? super T> actual) {
        CorePublisher publisher = Operators.onLastAssembly(this);
        CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);

        if (subscriber instanceof Fuseable.QueueSubscription && this != publisher && this instanceof Fuseable && !(publisher instanceof Fuseable)) {
            subscriber = new FluxHide.SuppressFuseableSubscriber<>(subscriber);
        }

        try {
            if (publisher instanceof OptimizableOperator) {
                OptimizableOperator operator = (OptimizableOperator) publisher;
                while (true) {
                    subscriber = operator.subscribeOrReturn(subscriber);
                    if (subscriber == null) {
                        // null means "I will subscribe myself", returning...
                        return;
                    }
                    OptimizableOperator newSource = operator.nextOptimizableSource();
                    if (newSource == null) {
                        publisher = operator.source();
                        break;
                    }
                    operator = newSource;
                }
            }

            publisher.subscribe(subscriber);
        }
        catch (Throwable e) {
            Operators.reportThrowInSubscribe(subscriber, e);
            return;
        }
    }

Flux#subscribe(Subscriber actual)

  • 这里this是FluxFilterFuseable,入参actual是LambdaSubscriber
  • subscriber = operator.subscribeOrReturn(subscriber),这里operator是FluxFilterFuseable,进入FluxFilterFuseable#subscribeOrReturn,创建FilterFuseableSubscriber<>(actual, predicate)
  • newSource = operator.nextOptimizableSource()获取FluxFilterFuseable的OptimizableOperator,其实就是FluxMapFuseable,然后赋值给operator
  • 循环调用subscriber = operator.subscribeOrReturn(subscriber),进入FluxMapFuseable#subscribeOrReturn,创建MapFuseableConditionalSubscriber<>(cs, mapper)
  • newSource = operator.nextOptimizableSource()获取FluxMapFuseable的OptimizableOperator,为null。若为null,则获取publisher = operator.source()也即FluxArray,然后break,此时已经跳出循环
  • publisher.subscribe(subscriber),此时publisher是最里层的FluxArray,subscriber是MapFuseableConditionalSubscriber
  • FluxArray#subscribe(actual)
  • subscribe(actual, array)

FluxArray#subscribe(CoreSubscriber<? super T> s, T[] array)

    public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) {
        if (array.length == 0) {
            Operators.complete(s);
            return;
        }
        if (s instanceof ConditionalSubscriber) {
            s.onSubscribe(new ArrayConditionalSubscription<>((ConditionalSubscriber<? super T>) s, array));
        }
        else {
            s.onSubscribe(new ArraySubscription<>(s, array));
        }
    }

2.2 onSubscribe阶段

FluxArray#subscribe(CoreSubscriber<? super T> s, T[] array)

  • s是MapFuseableConditionalSubscriber,array是数据源
  • s.onSubscribe(new ArrayConditionalSubscription<>((ConditionalSubscriber<? super T>) s, array))
  • MapFuseableConditionalSubscriber#onSubscribe(Subscription s),将this.s = (QueueSubscription<T>) s,并且调用actual.onSubscribe(this),actual是FilterFuseableSubscriber
  • FilterFuseableSubscriber#onSubscribe(Subscription s),这里s是MapFuseableConditionalSubscriber,将this.s赋值为MapFuseableConditionalSubscriber,并且调用actual.onSubscribe(this),actual是LambdaSubscriber
  • LambdaSubscriber#onSubscribe(Subscription s),这里s是FilterFuseableSubscriber,赋值this.subscription为FilterFuseableSubscriber,然后调用s.request(Long.MAX_VALUE)

LambdaSubscriber#onSubscribe

    public final void onSubscribe(Subscription s) {
        if (Operators.validate(subscription, s)) {
            this.subscription = s;
            if (subscriptionConsumer != null) {
                try {
                    subscriptionConsumer.accept(s);
                }
                catch (Throwable t) {
                    Exceptions.throwIfFatal(t);
                    s.cancel();
                    onError(t);
                }
            }
            else {
                s.request(Long.MAX_VALUE);
            }
        }
    }

2.3 request阶段

FilterFuseableSubscriber#request(Long.MAX_VALUE)

  • s.request(n),也即调用MapFuseableConditionalSubscriber#request
  • s.request(n),也即调用ArrayConditionalSubscription#request

ArrayConditionalSubscription#request

        public void request(long n) {
            if (Operators.validate(n)) {
                if (Operators.addCap(REQUESTED, this, n) == 0) {
                    if (n == Long.MAX_VALUE) {
                        fastPath();
                    }
                    else {
                        slowPath(n);
                    }
                }
            }
        }

2.4 调用阶段

ArrayConditionalSubscription#request

  • ArrayConditionalSubscription#fastPath()
  • 这里循环处理array中每一个元素,调用actual.onNext(t),这里actual是MapFuseableConditionalSubscriber,也即MapFuseableConditionalSubscriber#onNext()
  • 调用v = mapper.apply(t),真的mapper逻辑,然后调用actual.onNext(v),是FilterFuseableSubscriber#onNext(v)
  • 调用b = predicate.test(t),真正的Filter逻辑,if (b) {actual.onNext(t)},否则s.request(1),丢弃该不满足条件的元素。这里actual是LambdaSubscriber,也即LambdaSubscriber#onNext
  • 最终调用consumer.accept(x),也就是最终的消费者进行处理

执行逻辑的核心在ArrayConditionalSubscription#fastPath()

        void fastPath() {
            final T[] a = array;
            final int len = a.length;
            final Subscriber<? super T> s = actual;

            for (int i = index; i != len; i++) {
                if (cancelled) {
                    return;
                }

                T t = a[i];

                if (t == null) {
                    s.onError(new NullPointerException("The " + i + "th array element was null"));
                    return;
                }

                s.onNext(t);
            }
            if (cancelled) {
                return;
            }
            s.onComplete();
        }

总结

  • 1)声明阶段


  • 2)subscribe阶段


  • 3)onSubscribe阶段
    在此处将 Publisher 和 Subscriber 包裹成一个 Subscription 对象,为每一层Subscriber添加Subscription


  • 4)request阶段
    最终在 原始 Subscriber 对象(最外层,也就是FluxArray中的ArrayConditionalSubscription)调用 request() ,触发 Subscription 的 Source 获得数据作为 onNext 的参数。

  • 5)调用阶段
    遍历处理数组的每个元素,从最外层的Subscription开始往里层调用onNext(),每一层需要调用对应的实际处理逻辑,比如mapper/ predicate/ consume等。

参考

相关文章

网友评论

      本文标题:Project Reactor源码分析1-底层实现原理分析

      本文链接:https://www.haomeiwen.com/subject/ngtqrdtx.html