美文网首页
Rx转换操作符

Rx转换操作符

作者: gczxbb | 来源:发表于2019-04-06 14:55 被阅读0次

    map操作符

    被观察者数据源泛型,当发射器的数据类型和观察者数据类型不同时,通过map操作符转换,可以将上游发射的类型转换成任意对象类型。

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onComplete();
        }
    }).map(new Function<Integer, String>() {
        @Override
        public String apply(Integer s) throws Exception {
            String newStr = s + "_";
            Log.d(TAG, "int apply s " + newStr);
            return newStr;
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String response) throws Exception {
            Log.d(TAG, "Observer : " + response);
        }
    });
    

    发射数据类型是Integer类,通过map操作符,将类型转换成String类。Function是一个类型转换接口,Function<T, R>,将T转换R,解决被观察者和观察者数据类型不匹配问题。

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
    

    返回一个被观察者ObservableMap,封装原始被观察者ObservableCreate和转换接口Function,调用ObservableMap的subscribe注册方法。

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    

    创建观察者MapObserver,封装自定义观察者和转换接口Function。source源即内部ObservableCreate,调用它的subscribe方法。

    被观察者链

    ObservableCreate的#subscribeActual方法,创建CreateEmitter数据发射器,通知观察者已经注册。
    调用数据源source(ObservableOnSubscribe)的subscribe方法,将发射器暴漏给外部。外部通过发射器发射数据,如onNext方法。
    发射器CreateEmitter持有观察者MapObserver,当onNext事件发射后,通知观察者MapObserver的onNext方法,传参发射的数据类型Integer类。

    public void onNext(T t) {
        ...    
        U v;
        try {
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
            return;
        }
        actual.onNext(v);
    }
    

    根据MapObserver内部转换接口Function,apply方法,将T类型转换成U类型,再调用自己定义观察者Observer的onNext方法,入参数据类型转换成String。
    发射器onNext方法和观察者accept方法按照通知顺序执行。

    Rx的map操作符

    flatMap操作符

    flatMap操作符和map类似,Function接口实现类型转换,转换的对象是一个被观察者ObservableSource。

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        }
    }).flatMap(new Function<Integer, ObservableSource<String>>() {
        @Override
        public ObservableSource<String> apply(Integer s) throws Exception {
            return Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    //将上游Integer类型数据,在新发射器中改造发射。
                    String newStr = s + "_gc1";
                    String newStr2 = s + "_gc2";
                    Log.d(TAG, "int apply s " + newStr);
                    Log.d(TAG, "int apply s " + newStr2);
                    e.onNext(newStr);
                    e.onNext(newStr2);
                }
            });
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String response) throws Exception {
            Log.d(TAG, "Observer : " + response);
        }
    });
    

    将上游发射器每个Integer类型的数据转换成Observable类型,再由每个转换的被观察者发射目标类型数据。

    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
                                           boolean delayErrors, int maxConcurrency, int bufferSize) {
        ...
        return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
    }
    

    返回一个被观察者ObservableFlatMap。封装原始被观察者ObservableCreate和转换接口Function,调用ObservableFlatMap的subscribe注册方法。

    public void subscribeActual(Observer<? super U> t) {
        ...
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }
    

    创建观察者MergeObserver,封装自定义观察者和转换接口Function,source源即内部ObservableCreate,调用它的subscribe方法。当发射器onNext方法发射时,调用发射器内部MergeObserver的onNext方法。

    @Override
    public void onNext(T t) {
    
        ObservableSource<? extends U> p;
        try {
            p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
        } catch (Throwable e) {
            return;
        }
        //调用的是新建ObservableSource的注册方法。
        subscribeInner(p);
    }
    

    通过Function接口方法,将Integer类型转换成ObservableSource类型,转换对象是一个被观察者,外部创建,ObservableCreate类型,将Integer类型的数据暴露在新被观察者的数据源发射器中,处理转换成新发射器支持String类型,subscribeInner方法,新被观察者订阅。

    void subscribeInner(ObservableSource<? extends U> p) {
        for (;;) {
            if (p instanceof Callable) { 
                ...
            } else {
                InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                if (addInner(inner)) {
                    p.subscribe(inner);
                }
                break;
            }
        }
    }
    

    调用Observable的subscribe方法,订阅InnerObserver观察者,外部调用发射器onNext方法,可以获取apply方法中上层发射的Integer数据,按照String类型,触发两个onNext方法再次发射数据,两次调用观察者InnerObserver的onNext方法,每次,调用它引用MergeObserver的onNext方法,最终,通知到外部观察者。

    flatMap最初的onNext顺序,在Function转换成新Observable后,根据收到的数据,包装重新发射一批新数据。在观察者到的onNext顺序不一定是按照最初的onNext顺序调用的。
    上面发送的1,2,3,在观察者中看到的不一定是1,2,3的排序,加一个延迟就能看到,即1_gc1,1_gc2,3_gc1,3_gc2,2_gc1,2_gc2。

    flatMap操作符数据流程

    总结

    flatMap不保证数据发射流的通知顺序。
    concatMap和flatMap功能相同,可以保证按照发射顺序通知。


    任重而道远

    相关文章

      网友评论

          本文标题:Rx转换操作符

          本文链接:https://www.haomeiwen.com/subject/jraliqtx.html