美文网首页安卓开发博客
手写 RxJava ---- map 操作符

手写 RxJava ---- map 操作符

作者: 石器时代小古董 | 来源:发表于2020-04-12 21:00 被阅读0次

    一、实现的思路

    RxJava 提供了很多的变换操作符,将上游的数据转换成另一种数据,可以在传送数据流的过程中构造一个新的类,这个类即持有上游的 Observable 也持有下游的 Observer 和 变换的能力来实现

    二、具体代码

    1.创建 map 操作符

    map 操作符本身持有 ObservableOnSubscribe 对象,这里将 ObservableOnSubscribe 交给一个 ObservableMap 让它持有了上游的能力。同时也持有了变换的能力

      /**
         * 自定义 map 操作符
         * map 操作符会拿到 SelfObservableMap 所持有的 Observable 对象
         * 交给一个 ObservableMap 对象 然后替换一个新的(持有 ObservableMap 这个 ObservableOnSubscribe 的引用,已经一个转换的函数)
         * 即拥有控制上一层的能力 也拥有控制下一层的能力
         * <p>
         * 变换操作符只考虑上一层的类型 变换成新的类型后 给到下一层
         */
        public <R> SelfObserverable<R> map(CFunction<? super T, R> function) {
            ObservableMap observableMap = new ObservableMap(observable, function);
            // 这里会把 source 替换成 ObservableMap
            // 在 subscribe 时实际调用的 observable map 所持有的 source 的 subscribe 方案
            return new SelfObserverable<>(observableMap);
        }
    

    2.ObservableMap 同时提供了 subscribe 函数,让它持有了下游的能力

    /**
     * 为 Map 专门定义的 ObservableOnSubscribe
     * T 接收的类型 R 返回的类型
     *
     */
    public class ObservableMap<T,R> implements ObservableOnSubscribe<R> {
        // 这里持有的是第一次通过 create 函数创建持有的 ObservableOnSubscribe 对象
        private ObservableOnSubscribe observable;
        private CFunction<? super T, ? extends R> function;
    
        public ObservableMap(ObservableOnSubscribe source, CFunction<? super T, ? extends R> function) {
            this.observable = source;
            this.function = function;
        }
    
        /**
         * observableEmitter 是外层通过 subscribe 函数传递进来的 Observer 类
         * @param observer
         */
        @Override
        public void subscribe(Observer<? super R> observer) {
            // SelfObserverable 调用 subscribe 函数时 实际上调用的是
            MapObserver mapObserver = new MapObserver(observer,  function);
            // observable 是通过 create 或者 just 传递进来的 observable,现在交给了它一个 MapObserver
            /**
             *     observable ----> create 的 new ObservableOnSubscribe
             *     // 使用 Map 操作符
             *     SelfObserverable.create(new ObservableOnSubscribe<Integer>() {
             *      @Override
             *      public void subscribe(Observer<? super Integer> observableEmitter) {
             *            observableEmitter.onNext(1);
             *            observableEmitter.onComplete();
             *         }
             *     })
             *
             *    这个 observable.subscribe 调用的就是 ObservableOnSubscribe 的 subscribe 函数的方法。
             *    会触发 MapObserver 类的 onNext 和 onComplete
             */
            observable.subscribe(mapObserver);
        }
    
    

    3.在 subscribe 时又对 observer 做了进一步的包装,让被观察者 ObservableOnSubscribe 持有了 MapObserver 这个新的观察者类。在调用 onNext 时首先调用 CFunction 函数进行一次数据转换

     /**
         * 对 Observer 的包装
         *
         * @param <T>
         */
        class MapObserver<T> implements Observer<T> {
            Observer<? super R> observableEmitter;
            ObservableOnSubscribe source;
            CFunction<? super T, ? extends R> function;
    
            /**
             *
             * @param observableEmitter  通过 subscribe 传递进来的观察者
             * @param function function 是负责转换的函数
             */
            public MapObserver(Observer<? super R> observableEmitter,
                               CFunction<? super T, ? extends R> function) {
                this.observableEmitter = observableEmitter;
                this.function = function;
            }
    
            @Override
            public void onSubscirbe() {
                observableEmitter.onSubscirbe();
            }
    
            @Override
            public void onNext(T value) {
                // 将转换后的值交给 onNext
                R next = function.apply(value);
                observableEmitter.onNext(next);
            }
    
            @Override
            public void onComplete() {
                observableEmitter.onComplete();
            }
    
            @Override
            public void onError(Throwable throwable) {
                observableEmitter.onError(throwable);
            }
        }
    

    四、流程示例

    RxJava-2.jpg

    五、使用示例

      // 使用 Map 操作符
            SelfObserverable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(Observer<? super Integer> observableEmitter) {
                    observableEmitter.onNext(1);
                    observableEmitter.onComplete();
                }
            }).map(new CFunction<Integer, String>() {
    
                @Override
                public String apply(Integer integer) {
                    return "我被转换啦";
                }
            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscirbe() {
                    Log.d("TAG", "on subscribe with map");
                }
    
                @Override
                public void onNext(String value) {
                    Log.d("TAG", "我被转换啦");
                }
    
                @Override
                public void onComplete() {
    
                }
    
                @Override
                public void onError(Throwable throwable) {
    
                }
            });
    

    相关文章

      网友评论

        本文标题:手写 RxJava ---- map 操作符

        本文链接:https://www.haomeiwen.com/subject/weheictx.html