美文网首页
RxJava浅析二——map操作原理

RxJava浅析二——map操作原理

作者: 青象 | 来源:发表于2017-11-30 14:45 被阅读7次

    我之前写过RxJava1.x中Map操作的原理,然而时隔半年,我已经全然忘记RxJava1.x是怎么实现的了,唯一的印象是挺复杂的。

    当时可能也是浅尝辄止,对其理解不是那么深,过一段时间决定就忘记了。所以把这些现在理解的东西记录下来是很有必要的。

    所以,蹬蹬瞪蹬……,RxJava2版本的Map原理闪亮登场。

    本文是基于RxJava浅析——事件如何从上游传递到下游。建议先看下这篇。

    先上代码,没错,本文就是要分析下这段代码的执行过程。跟上一篇分析区别就在与create()subscribe()之间多了一个map操作。

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                }
            }).map(new Function<Integer, String>() {
                @Override
                public String apply(@NonNull Integer integer) throws Exception {
                    return String.valueOf(integer);
                }
            }).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    
                }
    
                @Override
                public void onNext(String s) {
                    Log.i(TAG, "onNext: " + s);
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError:", e);
                }
    
                @Override
                public void onComplete() {
                    Log.i(TAG, "onComplete");
                }
            });
    

    我们知道Observable.create()会返回一个ObservableCreate对象。ObservableCreateObservable的子类。记住这是我们第一个碰到的具体的Observable实现类。

    那么核心代码就在这里了:

    //Observable的map方法。ObservableCreate.map使用的是父类(Observable)的实现
    //不是源码,去掉了一些空检查等
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
            return new ObservableMap<T, R>(this, mapper);
    }
    //附上Observable.create()方法对比一下
    //代码位置:io.reactivex.Observable.java
    //不是源码,是我简化后的代码。源码还有空检查,Hook方法的回调等。这些不影响整个逻辑,所以可以先忽略。
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            return new ObservableCreate<T>(source);
    }
    

    呀呀呀,这段代码是不是很熟悉,跟create()方法一样的套路啊。只是这个返回的是一个ObservableMap,这个ObservableMap也是Observable的子类,这是我们碰到的第二个具体的Observable实现类。

    来看看构造方法中看了啥。

    final Function<? super T, ? extends U> function;
    
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);//其实就是在父类中保存source,不用在乎这个super。
        this.function = function;//我们外部实现的Function接口。
    }
    

    呀呀呀,也只是保存了一下sourcefunction。套路还是一样的。注意注意,这个source应该是create()方法返回的ObservableCreate对象哦~

    奥,解释一下这个Function也是一个接口。

    public interface Function<T, R> {
        /**
         * Apply some calculation to the input value and return some other value.
         * @param t the input value
         * @return the output value
         * @throws Exception on error
         */
        R apply(@NonNull T t) throws Exception;
    }
    
    

    这个接口中只有一个apply()方法,接收T类型参数,返回R类型参数。在我们的例子里就是接收Integer类型的参数,返回String类型的参数。妥妥的,就是一个map操作嘛。把Integer类型转换成了String类型。怎么转换?当然是我们自己实现Function接口。在例子里只是简单地把Integer转换成了String而已。

    所以到此为止,啥也没发生。

    关键当然还是把上下游连接起来的subscribe(Observer)方法。

    map操作时候返回的是ObservableMap对象,所以subscribe(Observer)方法最后会调到ObservableMap.subscribeActual()方法。这个还不清楚的,参见“RxJava浅析——事件如何从上游传递到下游”

    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function)); //Point 0
    }
    

    这货干了两件事情:

    1. 创建了MapObserver
    2. 调用了source.subscribe()方法,并把创建出来的MapObserver传入。

    先来讲第一件事情,这是我们除了自己创建的Observer之外,第一次遇到RxJava2内部创建的Observer对象。MapObserverObservableMap的静态内部类。

    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
                super(actual);//Point 1 
                this.mapper = mapper;//Point 2
         }
    }
    

    可以看到这个MapObserver继承BasicFuseableObserver, 可以猜到其父类肯定是继承Observer.

    public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R>
    

    这个QueueDisposable可以就理解成Disposable。我们暂时不扯那么远。

    回到Point 1,这句话就是把我们外部的Observer给保存下来。

    Point 2,这句话就是把我们的function保存下来。

    重点来了!

    看Point 0处的代码,这个source是我们的ObservableCreate,这里的入参是刚刚创建的ObserverMap。所以RxJava内部又发生了我们在“RxJava浅析——事件如何从上游传递到下游”所分析的事情。

    还记得么?这个subscribe()方法最终调到ObservableCreate.subscribeActual()方法。再来看一遍:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);//Point 3
        try {
            source.subscribe(parent); //Point 4
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    

    这里就很熟悉啦。只是这里的observer是RxJava内部帮我们创建的ObserverMap

    Point 3 会调到ObserverMap.onSubscribe()

    //代码在BasicFuseableObserver中,非源码。去掉其他的一些代码
    @Override
    public final void onSubscribe(Disposable s) {
        this.s = s;
        actual.onSubscribe(this);
     }
    

    这个actual就是我们在创建ObserverMap时传入的,就是我们外部实现的Observer。可以把this传出去,是因为ObserverMap本身也是Disposable接口的实现类。当然入参s(实际上是CreateEmitter)也会保存下来。

    Point 4中会调到我们外部实现的ObservableOnSubscribe接口的subscribe()。即执行事件发送。再提一下,这个source是在创建ObservableCreate时保存的。

    所以呢,当我们使用CreateEmitter发送数据时: e.onNext(1)

    实际上也是先调到CreateEmitteronNext(1),他调到他自己保存的ObserveronNext(1),在我们的例子里这个就是ObserverMap。所以我们来看ObserverMaponNext方法。

    //代码位置:ObserverMap的onNext方法。
    final Function<? super T, ? extends U> mapper;
    U v;
    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
             return;
        }
       actual.onNext(v);
    

    关键点在于mapper.apply(t),这句话把T类型的对象转换成了U类型的对象,也就是我们例子中把Integer类型的对象转换成了String类型。requireNonNull只是对转换后的对象进行了空检查。因为这个转换是我们外部提供的实现,并不能保证非空。

    最后再调用actual.onNext(v)就是调到了我们提供的ObserveronNext方法,此时类型已经发生了转换。

    以上就是map操作的原理啦。对onErroronComplete事件其实没有起什么作用。

    最后用一个图来解释:

    map操作原理图.png

    没有Map操作的时候

    • 事件流向:CreateEmitter->Observer。
    • 创建流的过程如图中数字所示。

    增加Map操作的时候事件流向

    • 事件流向:CreateEmitter->ObserverMap->Observer。
    • 创建流的过程如图中数字所示。

    相关文章

      网友评论

          本文标题:RxJava浅析二——map操作原理

          本文链接:https://www.haomeiwen.com/subject/abuvbxtx.html