美文网首页Android
RxJava——Map操作符原理

RxJava——Map操作符原理

作者: 青象 | 来源:发表于2017-04-18 10:49 被阅读137次

    RxJava Map操作原理

    最近看了一些RxJava的文章,被他好多操作符的原理包括线程切换之类的搞得云里雾里。现在整理了一份最基础的Map操作符原理,加强一下理解!!!

    本文的原理分析基于RxJava 1.0.14。对Map操作进行了仔细的分析,阅读本文前要求知道RxJava观察者模式的基本原理,即只有观察者注册的时候被观察者的事件序列才会被发送。

    先来看一个例子

    Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("123456");
                    subscriber.onComplete();
                }
            }).map(new Func1<String, Integer>() {
                @Override
                public Integer call(String s) {
                    return Integer.parseInt(s);
                }
            }).subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.i(TAG,"Integer=" + integer);
                }
            });
    

    这个例子先创建了一个Observable对象,它持有一个OnSubscribe<String>的对象,如果没有map方法进行转换,那么这个Observable对应的Subscriber应该是Subscriber<String>,并且Subscriber的回调方法onNext中的参数类型应该是String。通过map方法的映射,最终我们注册的是Subscriber<Integer>

    接下来,我们先介绍一下map方法里做了哪些事。然后我们从发生注册事件开始说起,因为RxJava的所有事件都是观察者在注册的时候才开始发送(或者说激活)的。

    那么map方法究竟是怎么做这个转换的呢?

    可以看到,map方法的入参是一个Func1对象。Func1类的声明如下,先不管它泛型的参数是T,还是R。只要记住它只有一个call方法,该方法以左边的参数为入参,右边的参数为返回值。即T为入参,R为返回值。

    public interface Func1<T, R> extends Function {
        R call(T t);
    }
    

    map方法的实现如下,我们看到它将我们传入的Func1对象封装成了OperatorMap对象。最终调用了lift方法。

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
            return lift(new OperatorMap<T, R>(func));
    }
    

    再来看看,OperatorMap是个什么玩意儿?

    public final class OperatorMap<T, R> implements Operator<R, T>;
    public OperatorMap(Func1<? super T, ? extends R> transformer) {
            this.transformer = transformer;
    }
    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
            // cover for generics insanity
    }
    

    Operator其实就是Fun1,它继承了Fun1,也没做什么扩展。OperatorMap的构造方法里把我们new出来的Fun1对象赋值给了transformer

    接着来看lift方法里做了什么。

    lift方法的实现,代码位于Observable中。

    // 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
    // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
    public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
        return Observable.create(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber subscriber) {
                Subscriber newSubscriber = operator.call(subscriber);
                newSubscriber.onStart();
                //注意!!!这个onSubscribe是原来的Observable对象持有的onSubscriber
                onSubscribe.call(newSubscriber);
            }
        });
    }
    

    可以看到lift方法返回了一个Observable对象,我们把它叫做Observable$2,它持有的OnSubscriber对象叫做OnSubscriber$2,注册这个。我们本来有的那个Observable对象,叫做Observable$1,同样它持有的OnSubscriber对象叫做OnSubscriber$1

    这个lift方法做了三件事:

    1、利用传入的OperationMap创建了一个新的Subscriber

    2、调用了新的SubscriberonStart()方法。这只是一个可选的准备方法,可以暂时忽略

    3、调用了OnSubscriber$1call方法。

    我们来看这个Subscriber$2是怎么被创建出来的,看我们传入的OperationMapcall方法。为了不被TR混淆,我们这里可以理解成T就是StringR就是Integer。因为我们调用map方法是创建的Fun1对象是Fun1<String, Integer>

        @Override
        public Subscriber<? super T> call(final Subscriber<? super R> o) {
            return new Subscriber<T>(o) {
                 @Override
                public void onCompleted() {
                    o.onCompleted();
                }
                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }
                @Override
                public void onNext(T t) {
                    try {
                        o.onNext(transformer.call(t));
                    } catch (Throwable e) {
                        Exceptions.throwIfFatal(e);
                        onError(OnErrorThrowable.addValueAsLastCause(e, t));
                    }
                }
    
            };
        }
    

    到这里我们基本清楚map方法里面做了哪些事,一是保存了我们传入的Fun1对象,二是创建了一个Observable$2对象,并返回。我们拿到了这个Observable$2并为之注册了一个Subscriber

    现在开始,我们要从注册事件发生开始说起,看看上面讲的各个方法是什么时候被触发调用。

    要明确一件事情,我们在外部调用map之后只创建了一个Subscriber,并且它的参数是Integer。那么当我们调用Observable$2.subscribe(Subscriber)时,发生了什么?(这里之所以是Observable$2,是因为流式调用,注册的应该是lift方法返回的Observable

    public Subscription subscribe(Subscriber subscriber) {
        subscriber.onStart();
        onSubscribe.call(subscriber);
        return subscriber;
    }
    

    注意!这个onSubscribe是我们上文所说的onSubscribe$2,这个subscriber应该是Subscriber<Integer>onSubscribe.call(subscriber)就会调用到我们讲过的lift方法里做的三件事情。

    这里再重复一遍,并转换一种说法。

    1、利用Subscriber<Integer>创建Subscriber<String>,这个部分的实现就在OperationMap里面。

    2、调用Subscriber<String>的onStart()

    3、调用OnSubscriber.call(Subscriber<String>),这个OnSubscriber已经说过了是OnSubscriber$1,所以是我们在外部创建的。这个call方法里面,我们调用了subscriber.onNext("123456"); subscriber.onComplete();

    最后的关键点在于我们通过Subscriber<Integer>创建Subscriber<String>之后,调用到了subscriber<String>.onNext("123456")subscriber<String>.onComplete()(注意这个subscriber<String>的写法是不对的,我这边是为了能看的清逻辑,所以这样标注)。那么这个Subscriber<String>onNextonComplete里面做了什么呢? 还记得我们是怎么创建的吧?

    OperationMap里面,这里再贴一次代码。

      @Override
        public Subscriber<? super T> call(final Subscriber<? super R> o) {
            return new Subscriber<T>(o) {
                 @Override
                public void onCompleted() {
                    o.onCompleted();
                }
                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }
                @Override
                public void onNext(T t) {
                    try {
                        o.onNext(transformer.call(t));
                    } catch (Throwable e) {
                        Exceptions.throwIfFatal(e);
                        onError(OnErrorThrowable.addValueAsLastCause(e, t));
                    }
                }
    
            };
        }
    

    关键的地方就在于o.onNext(transformer.call(t)); 这个o应该是我们外部创建的Subscriber<Integer>,这个transformer是我们也是我们外部传入的new Fun1<String,Integer>。看明白了吧!!!Subscriber<String>只是个代理,最终是通过我们定义的Fun1对象的规则把String转换成Integer,再调用我们创建的Subscriber<Integer>onNext方法,其他方法同理。

    最后的最后,盗用一个抛物线大侠的图再加上我的一些文字说明再来回忆一下整个过程。

    map原理.png

    可能语言比较啰嗦,但也确实是自己学习和思考的过程。如有错误,请大家指正~~~

    参考文章

    给Android开发者的RxJava详解

    相关文章

      网友评论

      • W_BinaryTree:蛮不错 RxJava2 其实原理类似。只不过不再通过lift() 而是直接生成新的Observable。

      本文标题:RxJava——Map操作符原理

      本文链接:https://www.haomeiwen.com/subject/rwdfzttx.html