美文网首页
Rxjava源码解析--flatMap源码解析

Rxjava源码解析--flatMap源码解析

作者: Rogge666 | 来源:发表于2017-11-19 16:38 被阅读91次

    基于rxjava1.1.0

    用例代码↓
            Observable<String> observable1 = Observable.create(new Observable.OnSubscribe<String>() {
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("1");
                    subscriber.onCompleted();
                }
            });
    
            Subscriber<String> subscriber1 = new Subscriber<String>() {
                @Override
                public void onCompleted() {
                }
    
                @Override
                public void onError(Throwable e) {
                }
    
                @Override
                public void onNext(String s) {
                    Log.e("haha",s);
                }
            };
    
            observable1.flatMap(new Func1<String, Observable<String>>() {
                @Override
                 ⑬
                public Observable<String> call(String s) {
                    return Observable.just(s+"23");
                }
            }).subscribe(subscriber1);
    
    flatMap源码精简版↓
    ①
    public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
            return merge(map(func));
        }
    
    Map源码↓
    ②
    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
            return lift(new OperatorMap<T, R>(func));
        }
    
    OperatorMap精简版↓
    public final class OperatorMap<T, R> implements Operator<R, T> {
        private final Func1<? super T, ? extends R> transformer;
    
        public OperatorMap(Func1<? super T, ? extends R> transformer) {
            this.transformer = transformer;
        }
      
        @Override
        public Subscriber<? super T> call(final Subscriber<? super R> o) {
            ⑩
            //create subscriber2 传入subscriber3 = o
            return new Subscriber<T>(o) {
                @Override
                public void onNext(T t) {
                    ⑫
                    o.onNext(transformer.call(t));
                }
            };
        }
    }
    
    map lift精简源码↓
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            ③
            //create Observable2  OnSubscribe2
            return new Observable<R>(new OnSubscribe<R>() {
                ⑨
                @Override
                public void call(Subscriber<? super R> o) {
                    
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    st.onStart();
                    ⑪
                    onSubscribe.call(st);//onSubscribe1.call(subscriber2)
                }
            });
        }
    
    merge lift精简源码↓
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            ⑤
            //create Observable3  OnSubscribe3
            return new Observable<R>(new OnSubscribe<R>() {
                ⑥
                @Override
                public void call(Subscriber<? super R> o) {
                    
                    Subscriber<? super T> st = hook.onLift(operator).call(o);
                    st.onStart();
                    ⑧
                    onSubscribe.call(st);//onSubscribe2.call(subscriber3)
                }
            });
        }
    
    merge精简版源码↓
    ④
    public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
            return source.lift(OperatorMerge.<T>instance(false));//source = observable2 
        }
    
    OperatorMerge代码片段↓
     @Override
        ⑦
        //create subscriber3 传入subscriber1  = child
        public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
            MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
            MergeProducer<T> producer = new MergeProducer<T>(subscriber);
            subscriber.producer = producer;
            return subscriber;
        }
    
            @Override
            public void onNext(Observable<? extends T> t) {
                if (t == null) {
                    return;
                }
                if (t instanceof ScalarSynchronousObservable) {
                    ⑭
                    tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
                } else {
                    InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
                    addInner(inner);
                    t.unsafeSubscribe(inner);
                    emit();
                }
            }
    
    OperatorMerge代码精简片段↓
        void tryEmit(T value) {
            emitScalar(value, r);
        }
    
    OperatorMerge代码精简片段↓
        protected void emitScalar(T value, long r) {
            boolean skipFinal = false;
            ⑮
            child.onNext(value);
        }
    

    代码调用流程由①到最后
    代码分解
    observable1.flatMap(func).subscirbe(subcriber1)=
    observable1.merge(map(func)).subscirbe(subcriber1)=
    observable1.merge(observable2).subscirbe(subcriber1)=
    observable3.subscirbe(subcriber1)

    由上述代码分解可以知道执行observable1.flatMap(func).subscirbe(subcriber1)时map的lift先去创建observable2 onSubscribe2到这里时已经是①→②→③

    *//重点
    继续执行到④发现merge的调用是source.lift(),这里的source即是observable2 即observable1调用map的lift创建observable2 onSubscribe2, 到节点⑤observable2 调用merge的lift创建observable3 onSubscribe3,所以map lift 中有onSubscribe1的引用 , merge lift 中有onSubscribe2的引用

    此时订阅关系变为observable3.subscirbe(subcriber1) = observable3.onSubscribe3.call(subcriber1)即执行⑥到达⑦创建subscriber3 ,继续执行到达⑧执行onSubscribe2.call(subscriber3)到达⑨执行call方法到达⑩创建subscriber2 并传入subscriber3继续执行到达⑪等价于onSubscribe1.call(subscriber2)

    继续执行onSubscribe1.call()开始发射数据,subscriber2.onNext("1"),到达⑫,其中transformer.call(t)调用的是⑬生成一个ScalarSynchronousObservable类型的直接发射数据的observable4<String> 并把发射的数据缓存在 ScalarSynchronousObservable类中,继续执行subscriber3.onNext(observable4)到达⑭通过ScalarSynchronousObservable中的方法把observable4中的数据从缓存中取出来赋值给value最后到达⑮执行child.onNext(value);这里child = subscriber1 即subscriber1.onNext("123")

    至此流程完结

    相关文章

      网友评论

          本文标题:Rxjava源码解析--flatMap源码解析

          本文链接:https://www.haomeiwen.com/subject/eixpvxtx.html