美文网首页
RxJava1执行分析2 - map

RxJava1执行分析2 - map

作者: lyzaijs | 来源:发表于2018-02-13 11:24 被阅读39次

    书接上文《RxJava1执行分析1》,这里主要分析加入操作符map的执行流程:

    image.png

    示例2

    Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("goods");
                    subscriber.onCompleted();
                }
            })
                    .map(new Func1<String, String>() {
                        @Override
                        public String call(String s) {
                            return s+" map";
                        }
                    })
                    .subscribe(new Action1<String>() {
                        @Override
                        public void call(String s) {
                            System.out.println(s);
                        }
                    });
    

    map的源码:

        public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
            return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
        }
    
        public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
            return new Observable<T>(RxJavaHooks.onCreate(f));
        }
    

    这里可以发现 map 操作符会对应的生成Observable(rxjava中的操作符都会至少生成一个)与OnSubscribeMap( onSubscribe的实现) 。同时它也会持有链路中上一个节点的observable的引用

    OnSubscribeMap源码:

    public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
    
        final Observable<T> source;
    
        final Func1<? super T, ? extends R> transformer;
    
        public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
            this.source = source;
            this.transformer = transformer;
        }
    
        @Override
        public void call(final Subscriber<? super R> o) {
            MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
            o.add(parent);
            source.unsafeSubscribe(parent);
        }
    
        static final class MapSubscriber<T, R> extends Subscriber<T> {
    
            final Subscriber<? super R> actual;
    
            final Func1<? super T, ? extends R> mapper;
    
            boolean done;
    
            public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
                this.actual = actual;
                this.mapper = mapper;
            }
    
            @Override
            public void onNext(T t) {
                R result;
    
                try {
                    result = mapper.call(t);
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    unsubscribe();
                    onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                    return;
                }
    
                actual.onNext(result);
            }
    
            @Override
            public void onError(Throwable e) {
                if (done) {
                    RxJavaHooks.onError(e);
                    return;
                }
                done = true;
    
                actual.onError(e);
            }
    
    
            @Override
            public void onCompleted() {
                if (done) {
                    return;
                }
                actual.onCompleted();
            }
    
            @Override
            public void setProducer(Producer p) {
                actual.setProducer(p);
            }
        }
    
    }
    

    以下的关键代码中可以发现 map节点对上一个节点进行了订阅,call中传入的subcriber是下一个节点的订阅者

       public void call(final Subscriber<? super R> o) {
            MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
            o.add(parent);
            source.unsafeSubscribe(parent);
        }
    

    source.unsafeSubscribe源码:

        public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
                 ...
                subscriber.onStart();
                RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
                return RxJavaHooks.onObservableReturn(subscriber);
                 ...
                return Subscriptions.unsubscribed();
            }
        }
    
    动图

    相关文章

      网友评论

          本文标题:RxJava1执行分析2 - map

          本文链接:https://www.haomeiwen.com/subject/sddjtftx.html