美文网首页
RxJava2框架源码分析三(map篇)

RxJava2框架源码分析三(map篇)

作者: yqianqiao | 来源:发表于2019-12-10 17:24 被阅读0次

    1.回顾

    上篇已经讲了RxJava2创建操作符create源码解析,不清楚的可以查看RxJava2框架源码分析二(Create篇)

    2.Map操作符

    • 定义

    官方定义:transform the items emitted by an Observable by applying a function to each item
    拙劣的翻译:应用一个函数 转换所有的被发射的item

    • 实例讲解
     Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            }).map(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Exception {
                    return "类型转换:" + integer;
                }
            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    System.out.println("开始采用subscribe连接");
                }
    
                @Override
                public void onNext(String integer) {
                   System.out.println(integer);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
                    System.out.println("对Complete事件作出响应");
                }
            });
    

    关于创建步骤这些上面文章已经分析过了,所以直接跳过了,不清楚的可以到RxJava2框架源码分析二(Create篇)观看,这里直接看到运行结果:

    示意图.png

    3. 源码分析

    老套路,按照步骤走

    • 步骤一:创建被观察者ObservableCreate&定义需发送的事件
    • 步骤二:通过ObservableCreate再次创建被观察者ObservableMap&对事件进行加工操作
    • 步骤三:创建观察者Observer&定义响应事件的行为
    • 步骤四:通过ObservableMap订阅subscribe观察者Observer
      注意:上面用到的ObservableCreateObservableMap都是Observable的子类,都是被观察者。

    步骤一:创建ObservableCreate

    步骤二

    • 源码分析
    //步骤二,创建ObservableMap被观察者
    .map(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Exception {
                    return "类型转换:" + integer;
                }
            })
    
    //源码分析
    //创建ObservableMap被观察者
     public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
            //判断非空
            ObjectHelper.requireNonNull(mapper, "mapper is null");
            //创建ObservableMap并且返回出去(注意这里构造方法中的this指的是上一个被观察者ObservableCreate)
            return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
        }
    
    //这类继承了AbstractObservableWithUpstream,该类也是Observable的子类
    public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        final Function<? super T, ? extends U> function;
         ...
        // 仅贴出关键源码
        public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
            //初始化,调用父类构造方法
            //source是指上一个被观察者,本文中为ObservableCreate
            super(source);
          //将map()接收的function传递至全局
            this.function = function;
        }
    }
    
     //这类也是Observable的子类,主要作用是包装,扩展
    abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
        //上一个被观察者的引用
        protected final ObservableSource<T> source;
    
        AbstractObservableWithUpstream(ObservableSource<T> source) {
            //通过构造方法赋值
            this.source = source;
        }
        @Override
        public final ObservableSource<T> source() {
            //返回上一个被观察者
            return source;
        }
    }
    /**
      * 此接口为Observable.map()里面的参数接口
      **/
    public interface Function<T, R> {
        //定义两个泛型,参数为泛型T,返回泛型R
        R apply(@NonNull T t) throws Exception;
    }
    
    • 步骤二总结
      通过上一个被观察者ObservableCreate.map()创建了一个ObservableMap对象,并通过构造方法传入ObservableCreate引用this以及Function接口的实现类。ObservableMap构造方法把ObservableCreatethis传递给其父类AbstractObservableWithUpstream并赋source

    步骤三:创建观察者Observer
    创建Observer接口的实现类

    步骤四:通过ObservableMap订阅subscribe观察者Observer

    • 源码分析
    /** 
      * 源码分析:ObservableMap.subscribe(observer)
      * 说明:该方法属于 Observable 类的方法(注:传入1个 Observer 对象)
      **/  
    public abstract class Observable<T> implements ObservableSource<T> {
         ...
        // 仅贴出关键源码
      @Override
      public final void subscribe(Observer<? super T> observer) {
             ...
             // 仅贴出关键源码
            //可以看到调用的是本类的下面抽象方法
             subscribeActual(observer); 
       }
        //定义了一个抽象方法当调用subscribe时会跟这个调用Observable子类的实现方法(就是调用者)
       protected abstract void subscribeActual(Observer<? super T> observer);
    }
    
    /**
    *  现在我们回到先前创建的被观察者中 ObservableMap类 
    **/
    public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        //构造方法传入的Function实现类
        final Function<? super T, ? extends U> function;
        
        public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
             //super()将上游的Observable保存起来 ,用于subscribeActual()中用。
            //source是指上一个被观察者,本文中为ObservableCreate
            super(source);
          //将map()接收的function传递至全局
            this.function = function;
        }
    
        @Override
        public void subscribeActual(Observer<? super U> t) {
            //source是AbstractObservableWithUpstream的成员变量,通过上面构造方法传入的(本例子中带代表的是ObservableCreate)
            //步骤一,创建MapObserver对象,并传入观察者(observer)以及function,这里只是创建对象,没有调用里面的方法
            //步骤二,调用source.subscribe()把MapObserver对象传入ObservableCreate中去
            source.subscribe(new MapObserver<T, U>(t, function));
        }
        /**
          *  这类是map 的包装类是Observer的子类,BasicFuseableObserver继承的是Observer,
          *  该类是可融合中间观察者的基类,里面主要实现了onSubscribe()、onError()、onComplete() 等一些抽象方法
        **/
        static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
            //Function实现类
            final Function<? super T, ? extends U> mapper;
    
            MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
                //将观察者传递给父类downstream(就是父类的成员变量downstream)
                super(actual);
                this.mapper = mapper;
            }
            
            @Override
            public void onNext(T t) {
                // 父类成员变量,表示是否发送过onError()、onComplete()事件
                if (done) {
                    return;
                }
                //默认sourceMode是0,所以跳过
                if (sourceMode != NONE) {
                    downstream.onNext(null);
                    return;
                }
    
                U v;
    
                try {
                    // 讲通过调用Function接口方法,将t传入,返回u,完成类型转换
                    //相当于 v =mapper.apply(t)
                    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                } catch (Throwable ex) {
                    fail(ex);
                    return;
                }
                //调用观察者的onNext()
                downstream.onNext(v);
            }
                 ...
             // 仅贴出关键源码
        }
    }
    
    /**
       *  熟悉的类ObservableCreate,详细的分析在上篇文章
       * 上一个类中我们调用了source.subscribe(new MapObserver<T, U>(t, function));
       *  Observable.subscribe()方法最终会回调到ObservableCreate中的subscribeActual()方法
     **/
    public final class ObservableCreate<T> extends Observable<T> {
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            //将MapObserver()观察者封装成CreateEmitter对象
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
          // 调用观察者(MapObserver)的onSubscribe()
            observer.onSubscribe(parent);
            try {
                //3.调用source对象的subscribe()方法(发射器中的subscribe()实现类中的onNetx()系列方法)
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
        static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                //这里传入的是MapObserver对象
                this.observer = observer;
            }
            @Override
            public void onNext(T t) {
         
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
           
                if (!isDisposed()) {
                    //调用的其实是MapObserver对象中的onNext()方法
                    observer.onNext(t);
                }
            }
    }
    
    • 步骤四总结:
      当被观察者订阅观察者的时候,会调用被观察者Observable的subscribeActual()抽象方法,回调其子类重新的subscribeActual()方法。这方法里面有几个步骤:
    1. 通过ObservableMap调用父类Observable方法subscribe(),该方法回调父类抽象方法subscribeActual()的实现类ObservableMap中去。
    2. ObservableMap类中重写subscribeActual()方法中创建了MapObserver观察者对象,并通过调用父类Observable方法subscribe()MapObserver对象传递给上一个被观察者ObservableCreate中的subscribeActual()实现方法中。
    3. 创建1个CreateEmitter对象(封装成一个Disposable对象)。
    4. 调用观察者MapObserveronSubscribe(CreateEmitter parent )使其可以取消订阅。
    5. 调用source对象的subscribe(CreateEmitter parent)方法,通过 parent发送事件回调。

    4. 源码总结

    1. 创建被观察者,通过Observable.create()方法创建了ObservableCreate对象,然后通过ObservableCreate对象又创建了ObservableMap
    2. 订阅,ObservableMap通过调用父类方法subscribe()方法回调到ObservableMap重写父类subscribeActual()方法中,该方法创建MapObserver对象,并实现Observer中的onNext(),里面调用接口Function实现数据类型转换。
    3. ObservableMap.subscribeActual()方法中调用上一个被观察者ObservableCreate.subscribe(),回调到ObservableCreate.subscribeActual()中去,然后创建发射器调用MapObserver.onNext()

    相关文章

      网友评论

          本文标题:RxJava2框架源码分析三(map篇)

          本文链接:https://www.haomeiwen.com/subject/fgblgctx.html