美文网首页RX+Retrofit
RxJava 系列 (三)RxJava lift原理

RxJava 系列 (三)RxJava lift原理

作者: 嘎啦果安卓兽 | 来源:发表于2017-11-21 17:18 被阅读99次

    前言

    理解lift原理有什么意义?

    • 可以理解Rxjava最核心的原理,看懂了lift你就看懂了Rxjava

    lift是Rxjava操作符的基础原理,操作符是Rxjava功能如此丰富和好用的核心,理解了lift也就理解了Rxjava最核心的原理

    • 可以理解线程切换的原理,有助于灵活运用线程切换和调试线程相关的问题

    线程切换也是用的操作符,所以原理也是lift

    RxJava基本用法

    1. 创建 Observer
    Observer<String> observer = new Observer<String>() {
        @Override
        public void onNext(String s) {
            Log.d(tag, "Item: " + s);
        }
    
        @Override
        public void onCompleted() {
            Log.d(tag, "Completed!");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(tag, "Error!");
        }
    };
    

    Observer 需要实现三个方法,相当于定义了三种类型的事件

    • Subscriber 与 Observer的关系

    Subscriber相当于增加了Subscription(订阅关系管理)功能的Observer
    Subscription 包含两个方法:
    unsubscribe();(取消订阅)
    isUnsubscribed();(查询订阅关系)

    Subscriber 还增加了一个onStart()方法:它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作

    实质上,在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用,所以为了统一,我们就统一以Subscriber来作为观察者,就不再提Observer了。

    1. 创建 Observable
    Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("Hi");
            subscriber.onNext("Aloha");
            subscriber.onCompleted();
        }
    });
    

    创建Observable对象时,会传入一个 OnSubscribe 对象,OnSubscribe对象会被存储在生成的 Observable 对象中。

    OnSubscribe 对象的作用,就是拿到 subscriber对象,向subscriber 对象发送事件。
    这里拿到观察者Subscriber对象,并调用Subscriber实现的三个方法,就是在向观察者发送事件

    1. Subscribe (订阅)
    observable.subscribe(observer);
    // 或者:
    observable.subscribe(subscriber);
    

    Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):

    // 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
    // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
    public Subscription subscribe(Subscriber subscriber) {
        subscriber.onStart();
        onSubscribe.call(subscriber);
        return subscriber;
    }
    

    可以看到,subscriber() 做了3件事:

    1. 调用 Subscriber.onStart() 。这个方法在前面已经介绍过,是一个可选的准备方法。
    2. 调用 Observable 中的 OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。
    3. 将传入的 Subscriber 作为 Subscription 返回。这是为了方便进行订阅关系管理,比如 unsubscribe().

    操作符-对事件序列进行变换

    所谓变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。概念说着总是模糊难懂的,来看 例子。

    举个例子:map()

    Observable.just("images/logo.png") // 输入类型 String
        .map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String filePath) { // 参数类型 String
                return getBitmapFromPath(filePath); // 返回类型 Bitmap
            }
        })
        .subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) { // 参数类型 Bitmap
                showBitmap(bitmap);
            }
        });
    

    map(): 对事件对象的直接变换。

    变换的原理:lift()

    这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):

    // 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
    // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
    public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
        return Observable.create(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber subscriber) {
                Subscriber newSubscriber = operator.call(subscriber);
                newSubscriber.onStart();
                onSubscribe.call(newSubscriber);
            }
        });
    }
    

    先不用关心细节,我们先知道 lift是接受一个operator参数,返回一个新的Observable对象。
    在分析 lift() 的内部实现之前,我们先看一下加上操作符的一次调用的完整过程

    一次包含操作符的调用的完整过程

    Observable.just("images/logo.png") // 输入类型 String
        .map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String filePath) { // 参数类型 String
                return getBitmapFromPath(filePath); // 返回类型 Bitmap
            }
        })
        .subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) { // 参数类型 Bitmap
                showBitmap(bitmap);
            }
        });
    

    做了三步:
    1.生成observable对象
    2.在observable对象上调用了map方法
    3.在map方法的返回值上调用了.subscribe方法

    问题:1.map方法内部做了什么? 2.map方法的返回值是个什么?

    回答两个问题

    因为map()内部不是直接调用的lift方法(跟lift的原理一样,只是没有直接使用lift方法),所以我们以take()操作符的源码为例,来看方法调用。


    回答两个问题:

    1. take方法内部很简单,就是调用了lift方法
      2.返回值就是lift方法的返回值,是个新new的observable对象。

    将示例稍作修改

            Observable observable1 = Observable.just("images/logo.png"); // 输入类型 String
            
            Observable observable2 = observable1.map(new Func1<String, Bitmap>() {
                        @Override
                        public Bitmap call(String filePath) { // 参数类型 String
                            return getBitmapFromPath(filePath); // 返回类型 Bitmap
                        }
                    });
            
            observable2.subscribe(new Action1<Bitmap>() {
                        @Override
                        public void call(Bitmap bitmap) { // 参数类型 Bitmap
                            showBitmap(bitmap);
                        }
                    });
    
    observable2.subscribe 分析

    回顾前面subscribe()的内部实现,我们发现observable2里的OnSubscribe对象的call方法会被调用。而observable2就是lift方法返回的Observable对象, observable2里的onSubscribe对象就是lift的核心重点。

    observable2里的OnSubscribe对象 —lift的核心重点

    observable2对象里面通过observable1对象的onSubscribe.call(newSubscriber)达到通知observable1目的。
    因为observable1对象是我们初始的Observable对象,它的onSubscribe.call会发送事件到newSubscriber。
    newSubscriber是operator.call(subscriber)返回的,newSubscriber做了两件事:
    1.进行事件的变换操作。newSubscriber能拿到初始的事件,可以进行转换操作,这也是操作符发生效力的地方,不同的操作符的作用就是对事件进行不同的转换。
    2.转发给subscriber,newSubscriber有subscriber的引用,可以将转换后的事件转发给subscriber,也就是最终的订阅者。
    lift方法返回的observable2对象在调用链的中间起到了一个中转的作用,这就是lift原理的核心。

    operator.call内部通过传入一个subscriber返回一个newSubscriber,newSubscriber能达到事件转换和转发的目的。
    是如何做到的?
    我们来举个例子

    Operator的一个例子

    多个操作符的情况

    多个操作符相当于中间经过了多层中转,原理都一样

    关于事件发送的触发

    结合上面多个操作符的图,强调一下:

    事件发送的触发是从调用subscribe()方法后开始的,前面哪怕调用了N多的操作符方法只要还没有调用subscribe()方法其实并没有触发事件的发送。

    事件发送之前有一个从下往上通知的过程,当subscribe()方法被调用之后,先是通过observable2对象里调用observable1对象的onSubscribe.call通知observable1对象,如果observable1对象不是最开始发送事件的Observable对象(多个操作符的情况),那么同样的还会继续往上通知,直到通知到初始的Observable对象,才会开始事件的发送。

    所以RxJava是 先从下往上通知,然后再从上往下发送事件

    参考文献

    给 Android 开发者的 RxJava 详解 --扔物线
    http://gank.io/post/560e15be2dca930e00da1083

    相关文章

      网友评论

        本文标题:RxJava 系列 (三)RxJava lift原理

        本文链接:https://www.haomeiwen.com/subject/xnuwvxtx.html