美文网首页
RxJava(十三)--map()解析

RxJava(十三)--map()解析

作者: azu_test | 来源:发表于2019-03-08 18:35 被阅读0次

    介绍

    map()方法是对Observable内的数据处理器Observable.OnSubscribe执行完数据处理后的再次加工。

    执行代码

            //初始化被观察者Observable,并给其加上数据处理器Observable.OnSubscribe
            Observable Aobservable = Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("杨");
                    subscriber.onNext("月");
                    subscriber.onCompleted();
                }
            });
    
            //做Map变换处理
            Observable  Bobservable = Aobservable.map(new Func1<String,String>() {
                @Override
                public String call(String string) {
                    return string+"YaZhou";
                }
            });
    
             //初始化观察者Observer,视作结果接收器
             Observer observer = new Observer<String>() {
                @Override
                public void onCompleted() {
                    LogShowUtil.addLog("RxJava","结束",true);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(String string) {
                    LogShowUtil.addLog("RxJava","结果: "+string,true);
                }
            };
    
            Bobservable.subscribe(observer);
    

    上面代码AobservableBobservable命名不规范是为了做凸显之意,不要介意。

    源码分析

    1. 初始化被观察者AObservable
    Observable  Aobservable = Observable.create(原始数据处理器);
    

    由此可知被观察者AObservable持有原始数据处理器对象Observable.OnSubscribe。

    2. 执行map()变换操作
        Observable  Bobservable = Aobservable.map(数据变换器)
    
        Observable#map
        public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
            return create(new OnSubscribeMap<T, R>(this, func));
        }
    

    接着我们看其中的new OnSubscribeMap(Aobservable,数据变换器)操作

        OnSubscribeMap#OnSubscribeMap
        public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
            this.source = source;
            this.transformer = transformer;
        }
    

    由代码可知代理数据变换器OnSubscribeMap持有Aobservable和数据转换器func

    回到map()方法内继续执行create(代理数据变换器)

            return create(new OnSubscribeMap<T, R>(this, func));
    

    create方法之前已经分析过,由此可知Bobservable持有代理数据变换器OnSubscribeMap。

    3. 初始化结果接受器观察者Observer,
            Observer observer = new Observer<String>() {
               ...
            }
    
    4. 订阅
            Bobservable.subscribe(observer);
    

    由之前分析可知会使用 Bobservable内的代理数据变换器OnSubscribeMap做call()方法。
    其中observer为结果接受器

        OnSubscribeMap#call
        @Override
        public void call(final Subscriber<? super R> o) {
            //步骤一
            MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
            o.add(parent);
            //步骤二
            source.unsafeSubscribe(parent);
        }
    

    先看步骤一此处初始化了数据变换中转器MapSubscriber(结果接收器,数据变换器)。
    其中结果接收器是subscribe()方法传递进来的。
    数据变换器是map时初始化OnSubscribeMap传递进来的。

    接着看数据变换中转器MapSubscriber()构造方法

            public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
                this.actual = actual;
                this.mapper = mapper;
            }
    

    由此可知数据变换中转器中持有结果接收器actual和数据变换器mapper

    再回到OnSubscribeMap的call()方法内继续执行步骤二

            //步骤二
            source.unsafeSubscribe(parent);
    

    其中source为Aobservable,parent为数据变换中转器MapSubscriber

    继续执行会进入Aobservable.unsafeSubscribe()方法

        public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
            try {
                subscriber.onStart();
                //获取数据处理器Observable.OnSubscribe,并做数据处理工作
                RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
                return RxJavaHooks.onObservableReturn(subscriber);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                try {
                    subscriber.onError(RxJavaHooks.onObservableError(e));
                } catch (Throwable e2) {
                    Exceptions.throwIfFatal(e2);
                    RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
                    RxJavaHooks.onObservableError(r);
                    throw r;
                }
                return Subscriptions.unsubscribed();
            }
        }
    

    由此可知AObservable的原始数据处理器先执行call(数据变换中转器MapSubscriber)方法

    接下来会进入外部实现的外部数据处理器

               数据处理器内的call()方法
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("杨");
                    subscriber.onNext("月");
                    subscriber.onCompleted();
                }
    

    然后会进入数据变换中转器MapSubscriber的onNext()方法

            @Override
            public void onNext(T t) {
                R result;
                try {
                    //步骤一 数据变换操作
                    result = mapper.call(t);
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    unsubscribe();
                    onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                    return;
                }
                //步骤二 返回数据结果给用户(结果接收器)
                actual.onNext(result);
            }
    

    执行步骤一进入map()方法Func的外部实现方法,并返回数据

            observable = observable.map(new Func1<String,String>() {
                @Override
                public String call(String string) {
                    return string+"YaZhou";
                }
            });
    

    接着会执行步骤二,其中actual上文已经分析可知为结果接收器

                actual.onNext(result);
    

    接着就会进入结果接收器Observer内方法体内

            Observer observer = new Observer<String>() {
                @Override
                public void onCompleted() {
                    LogShowUtil.addLog("RxJava","结束",true);
                }
                @Override
                public void onError(Throwable e) {
                }
                @Override
                public void onNext(String string) {
                    LogShowUtil.addLog("RxJava","结果: "+string,true);
                }
            };
    

    最终输出结果

    结果: 杨YaZhou
    结果: 月YaZhou
    结束
    

    相关文章

      网友评论

          本文标题:RxJava(十三)--map()解析

          本文链接:https://www.haomeiwen.com/subject/ohrdpqtx.html