Flux.just("tom", "jack", "allen")
.map(s-> s.concat("@qq.com"))
.filter(s -> s.length() >= 11)
.subscribe(System.out::println);
结果:
jack@qq.com
allen@qq.com
1.声明阶段
Flux#just()
Flux#just()
- Flux#fromArray(T[] array)
- onAssembly(new FluxArray<>(array))
- 最后返回的就是FluxArray<>(array)
final class FluxArray<T> extends Flux<T> implements Fuseable, SourceProducer<T> {
final T[] array;
@SafeVarargs
public FluxArray(T... array) {
this.array = Objects.requireNonNull(array, "array");
}
Flux#map(Function mapper)
Flux#map(Function<? super T, ? extends V> mapper)
- onAssembly(new FluxMapFuseable<>(this, mapper))
final class FluxMapFuseable<T, R> extends InternalFluxOperator<T, R> implements Fuseable {
final Function<? super T, ? extends R> mapper;
FluxMapFuseable(Flux<? extends T> source,
Function<? super T, ? extends R> mapper) {
super(source);
this.mapper = Objects.requireNonNull(mapper, "mapper");
}
Flux#filter
Flux#filter(Predicate<? super T> p)
- onAssembly(new FluxFilterFuseable<>(this, p))
2.subscribe
2.1 subscribe阶段
Flux#subscribe(Consumer)
Flux#subscribe(Consumer)
- subscribe(consumer, null, null)
- subscribe(consumer, errorConsumer, completeConsumer, (Context) null)
- subscribeWith(new LambdaSubscriber<>(consumer, errorConsumer, completeConsumer, null, initialContext))
- subscribe(Subscriber<? super T> actual)
public final void subscribe(Subscriber<? super T> actual) {
CorePublisher publisher = Operators.onLastAssembly(this);
CoreSubscriber subscriber = Operators.toCoreSubscriber(actual);
if (subscriber instanceof Fuseable.QueueSubscription && this != publisher && this instanceof Fuseable && !(publisher instanceof Fuseable)) {
subscriber = new FluxHide.SuppressFuseableSubscriber<>(subscriber);
}
try {
if (publisher instanceof OptimizableOperator) {
OptimizableOperator operator = (OptimizableOperator) publisher;
while (true) {
subscriber = operator.subscribeOrReturn(subscriber);
if (subscriber == null) {
// null means "I will subscribe myself", returning...
return;
}
OptimizableOperator newSource = operator.nextOptimizableSource();
if (newSource == null) {
publisher = operator.source();
break;
}
operator = newSource;
}
}
publisher.subscribe(subscriber);
}
catch (Throwable e) {
Operators.reportThrowInSubscribe(subscriber, e);
return;
}
}
Flux#subscribe(Subscriber actual)
- 这里this是FluxFilterFuseable,入参actual是LambdaSubscriber
- subscriber = operator.subscribeOrReturn(subscriber),这里operator是FluxFilterFuseable,进入FluxFilterFuseable#subscribeOrReturn,创建FilterFuseableSubscriber<>(actual, predicate)
- newSource = operator.nextOptimizableSource()获取FluxFilterFuseable的OptimizableOperator,其实就是FluxMapFuseable,然后赋值给operator
- 循环调用subscriber = operator.subscribeOrReturn(subscriber),进入FluxMapFuseable#subscribeOrReturn,创建MapFuseableConditionalSubscriber<>(cs, mapper)
- newSource = operator.nextOptimizableSource()获取FluxMapFuseable的OptimizableOperator,为null。若为null,则获取publisher = operator.source()也即FluxArray,然后break,此时已经跳出循环
- publisher.subscribe(subscriber),此时publisher是最里层的FluxArray,subscriber是MapFuseableConditionalSubscriber
- FluxArray#subscribe(actual)
- subscribe(actual, array)
FluxArray#subscribe(CoreSubscriber<? super T> s, T[] array)
public static <T> void subscribe(CoreSubscriber<? super T> s, T[] array) {
if (array.length == 0) {
Operators.complete(s);
return;
}
if (s instanceof ConditionalSubscriber) {
s.onSubscribe(new ArrayConditionalSubscription<>((ConditionalSubscriber<? super T>) s, array));
}
else {
s.onSubscribe(new ArraySubscription<>(s, array));
}
}
2.2 onSubscribe阶段
FluxArray#subscribe(CoreSubscriber<? super T> s, T[] array)
- s是MapFuseableConditionalSubscriber,array是数据源
- s.onSubscribe(new ArrayConditionalSubscription<>((ConditionalSubscriber<? super T>) s, array))
- MapFuseableConditionalSubscriber#onSubscribe(Subscription s),将this.s = (QueueSubscription<T>) s,并且调用actual.onSubscribe(this),actual是FilterFuseableSubscriber
- FilterFuseableSubscriber#onSubscribe(Subscription s),这里s是MapFuseableConditionalSubscriber,将this.s赋值为MapFuseableConditionalSubscriber,并且调用actual.onSubscribe(this),actual是LambdaSubscriber
- LambdaSubscriber#onSubscribe(Subscription s),这里s是FilterFuseableSubscriber,赋值this.subscription为FilterFuseableSubscriber,然后调用s.request(Long.MAX_VALUE)
LambdaSubscriber#onSubscribe
public final void onSubscribe(Subscription s) {
if (Operators.validate(subscription, s)) {
this.subscription = s;
if (subscriptionConsumer != null) {
try {
subscriptionConsumer.accept(s);
}
catch (Throwable t) {
Exceptions.throwIfFatal(t);
s.cancel();
onError(t);
}
}
else {
s.request(Long.MAX_VALUE);
}
}
}
2.3 request阶段
FilterFuseableSubscriber#request(Long.MAX_VALUE)
- s.request(n),也即调用MapFuseableConditionalSubscriber#request
- s.request(n),也即调用ArrayConditionalSubscription#request
ArrayConditionalSubscription#request
public void request(long n) {
if (Operators.validate(n)) {
if (Operators.addCap(REQUESTED, this, n) == 0) {
if (n == Long.MAX_VALUE) {
fastPath();
}
else {
slowPath(n);
}
}
}
}
2.4 调用阶段
ArrayConditionalSubscription#request
- ArrayConditionalSubscription#fastPath()
- 这里循环处理array中每一个元素,调用actual.onNext(t),这里actual是MapFuseableConditionalSubscriber,也即MapFuseableConditionalSubscriber#onNext()
- 调用v = mapper.apply(t),真的mapper逻辑,然后调用actual.onNext(v),是FilterFuseableSubscriber#onNext(v)
- 调用b = predicate.test(t),真正的Filter逻辑,if (b) {actual.onNext(t)},否则s.request(1),丢弃该不满足条件的元素。这里actual是LambdaSubscriber,也即LambdaSubscriber#onNext
- 最终调用consumer.accept(x),也就是最终的消费者进行处理
执行逻辑的核心在ArrayConditionalSubscription#fastPath()
void fastPath() {
final T[] a = array;
final int len = a.length;
final Subscriber<? super T> s = actual;
for (int i = index; i != len; i++) {
if (cancelled) {
return;
}
T t = a[i];
if (t == null) {
s.onError(new NullPointerException("The " + i + "th array element was null"));
return;
}
s.onNext(t);
}
if (cancelled) {
return;
}
s.onComplete();
}
总结
-
1)声明阶段
-
2)subscribe阶段
-
3)onSubscribe阶段
在此处将 Publisher 和 Subscriber 包裹成一个 Subscription 对象,为每一层Subscriber添加Subscription
-
4)request阶段
最终在 原始 Subscriber 对象(最外层,也就是FluxArray中的ArrayConditionalSubscription)调用 request() ,触发 Subscription 的 Source 获得数据作为 onNext 的参数。 -
5)调用阶段
遍历处理数组的每个元素,从最外层的Subscription开始往里层调用onNext(),每一层需要调用对应的实际处理逻辑,比如mapper/ predicate/ consume等。
网友评论