美文网首页
Android中RxJava解析

Android中RxJava解析

作者: RmondJone | 来源:发表于2021-01-12 14:14 被阅读0次

    一、什么是RxJava?

    RxJava 在 GitHub 主页上的自我介绍是 "a library for composing asynchronous and event-based programs using observable sequences for the Java VM"(一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库)。这就是 RxJava ,概括得非常精准。

    二、为什么要用RxJava?

    1、异步操作很关键的一点是程序的简洁性,因为在调度过程比较复杂的情况下,异步代码经常会既难写也难被读懂。 Android 创造的 AsyncTask 和Handler ,其实都是为了让异步代码更加简洁。RxJava 的优势也是简洁,但它的简洁的与众不同之处在于,随着程序逻辑变得越来越复杂,它依然能够保持简洁,增加了工程的可维护性。

    2、轻松实现线程的切换工作,只需调用 subscribeOn() 和 observeOn() 两个方法。

    三、RxJava原理概括

    这里以RxJava1来阐述原理,RxJava2引入了最新的背压概念,用于解决上流事件发送太快,下流事件处理不及时问题。我们在使用过程中,只需了解RxJava的基本原理即可。最重要的就是理解RxJava为什么可以实现切换线程以及RxJava中流的转换过程。

    订阅的实现:

    1、RxJava的异步实现,是通过一种扩展的观察者模式来实现的。
    RxJava 的观察者模式大致如下图:

    2、 RxJava 使用 create() 方法来创建一个被观察者即Observable ,并为它定义事件触发规则

    首先创建一个OnSubscribe对象,在这个对象中定义了一些触发规则,然后调用create()方法来新建一个Observable对象。

    Observable observable = Observable.create(new Observable.OnSubscribe() {
        @Override
        public void call(String subscriber) {
            subscriber.onNext("Hello");
            subscriber.onNext("Hi");
            subscriber.onNext("Aloha");
            subscriber.onCompleted();
        }
    });
    

    3、创建观察者Subscriber或者Observer,Subscriber 对 Observer 接口进行了一些扩展,但他们的基本使用方式是完全一样的。不仅基本使用方式一样,实质上,在 RxJava 的 subscribe(订阅) 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用。

    注意点:这里的概念仅限于RxJava1,在RxJava2中这个概念是不同的,关于不同点可以查看文章最后的RxJava2的相关文章。

    Subscriber subscriber = new Subscriber() {
        @Override
        public void onNext(String s) {
            Log.d(tag, "Item: " + s);
        }
    
        @Override
        public void onCompleted() {
            Log.d(tag, "Completed!");
        }
    
        @Override
        public void onError(Throwable e) {
            Log.d(tag, "Error!");
        }
    };
    

    4、订阅

    创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了。代码形式很简单

    observable.subscribe(observer);
    // 或者:
    observable.subscribe(subscriber);
    

    我们来看一下subscribe()方法中做了哪些事情?

    // 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
    // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
    public Subscription subscribe(Subscriber subscriber) {
        subscriber.onStart();
        onSubscribe.call(subscriber);
        return subscriber;
    }
    

    可以看到,subscriber() 做了3件事:

    • 调用 Subscriber.onStart() 。这个方法在前面已经介绍过,是一个可选的准备方法。
    • 调用 Observable 中的 OnSubscribe.call(Subscriber) 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中, Observable 并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe() 方法执行的时候。
    • 将传入的 Subscriber 作为 Subscription 返回。这是为了方便 unsubscribe()
    线程切换、流的变换实现:

    1、在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

    在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

    • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

    • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

    • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

    • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

    • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

    2、在了解RxJava为什么可以切换线程之前,需要了解一下RxJava的变换

    • 首先看一个 map() 的例子
    Observable.just("images/logo.png") // 输入类型 String
        .map(new Func1() {
            @Override
            public Bitmap call(String filePath) { // 参数类型 String
                return getBitmapFromPath(filePath); // 返回类型 Bitmap
            }
        })
        .subscribe(new Action1() {
            @Override
            public void call(Bitmap bitmap) { // 参数类型 Bitmap
                showBitmap(bitmap);
            }
        });
    

    这里通过map()函数实现了String->Bitmap的流的转换。


    • 如果要实现一对多的转换该怎么办?假设有多个学生,一个学生有多门课程,我要打印每个学生的每门课程该怎么办?使用flatMap()!
    Student[] students = ...;
    Subscriber subscriber = new Subscriber() {
        @Override
        public void onNext(Course course) {
            Log.d(tag, course.getName());
        }
        ...
    };
    Observable.from(students)
        .flatMap(new Func1() {
            @Override
            public Observable call(Student student) {
                return Observable.from(student.getCourses());
            }
        })
        .subscribe(subscriber);
    

    flatMap() 的原理是这样的:

    • 使用传入的事件对象创建一个 Observable 对象。

    • 并不发送这个 Observable, 而是将它激活,于是它开始发送事件。

    • 每一个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法

    这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。

    3、变换的原理:lift()
    这些变换虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在 RxJava 的内部,它们是基于同一个基础的变换方法: lift(Operator)。首先看一下 lift() 的内部实现(仅核心代码):

    // 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
    // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
    public Observable lift(Operator operator) {
        return Observable.create(new OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                Subscriber newSubscriber = operator.call(subscriber);
                newSubscriber.onStart();
                onSubscribe.call(newSubscriber);
            }
        });
    }
    

    这段代码很有意思:它生成了一个新的 Observable 并返回,而且创建新 Observable 所用的参数 OnSubscribe 的回调方法 call() 中的实现竟然看起来和前面讲过的 Observable.subscribe() 一样!然而它们并不一样哟~不一样的地方关键就在于第二行 onSubscribe.call(subscriber) 中的 onSubscribe 所指代的对象不同(高能预警:接下来的几句话可能会导致身体的严重不适)

    • subscribe() 中这句话的 onSubscribe 指的是 Observable 中的 onSubscribe 对象,这个没有问题,但是 lift() 之后的情况就复杂了点。

    当含有 lift() 时:

    • lift() 创建了一个 Observable 后,加上之前的原始 Observable,已经有两个 Observable 了;
    • 而同样地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了两个 OnSubscribe;
    • 当用户调用经过 lift() 后的 Observable 的 subscribe() 的时候,使用的是 lift() 所返回的新的 Observable ,于是它所触发的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那个 OnSubscribe;
    • 而这个新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在这个 call() 方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一个新的 Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新 Subscriber 向原始 Observable 进行订阅。

    4、如果经过多次变换也就是多次lift()之后是什么样的?

    如下图所示:

    5、变换和线程切换有什么关系?

    我们知道subscribeOn() 和 observeOn() 可以实现线程的切换,其实subscribeOn() 和 observeOn() 内部的实现也是lift()。

    subscribeOn() 原理图:


    observeOn() 原理图:

    从图中可以看出,subscribeOn() 和 observeOn() 都做了线程切换的工作(图中的 "schedule..." 部位)。不同的是, subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程。

    6、最后,我用一张图来解释当多个 subscribeOn() 和 observeOn() 混合使用时,线程调度是怎么发生的(由于图中对象较多,相对于上面的图对结构做了一些简化调整)

    图中共有 5 处含有对事件的操作。由图中可以看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个 observeOn() 的影响,运行在绿色线程;⑤处受第二个 observeOn() 影响,运行在紫色线程;而第二个 subscribeOn() ,由于在通知过程中线程就被第一个 subscribeOn() 截断,因此对整个流程并没有任何影响。

    所以:当使用了多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用,作用于 subscribe()函数。每指定一次observeOn()线程则实现了一次线程的切换工作,在observeOn()之后的转换工作或者Subscriber都运行在observeOn()指定的线程中。

    关于RxJava相关的拓展

    1、原文链接:http://gank.io/post/560e15be2dca930e00da1083
    2、Rxjava2:https://www.jianshu.com/p/0cd258eecf60
    3、关于背压:https://www.jianshu.com/p/2c4799fa91a4
    4、GitHub:https://github.com/ReactiveX/RxJava

    相关文章

      网友评论

          本文标题:Android中RxJava解析

          本文链接:https://www.haomeiwen.com/subject/dcasaktx.html