美文网首页
RxJava 2.x实战

RxJava 2.x实战

作者: 啥也不说了 | 来源:发表于2018-11-12 11:34 被阅读9次

    零 书籍信息

    中信出版社出版,沈哲著。ISBN:9787121337222。
    当当电子书5块钱,不过当当电子书感觉用起来不怎么爽。

    第一章 RxJava简介

    1.1 你需要了解的函数响应式编程

    1. 响应式编程

    1. 响应式编程的特点:
      1)异步编程:提供了合适的异步编程模型,能够挖掘多核CPU的能力,提高效率,降低延迟和阻塞等。
      2)数据流:基于数据流模型,响应式编程提供了一套统一的Stream风格的数据处理接口。与Java 8中的Stream相比,响应式编程除了支持静态数据流,还支持动态数据流,并且支持复用和同时接入多个订阅者。
      3)变化传播:以一个输入流为输入,经过一连串操作转化为另一个数据流,然后分发给各个订阅者的过程。
    2. 响应式编程在用户界面及基于实时系统的动画方面有广泛的应用;也在处理嵌套回调的异步事件、负责的列表过滤和变换的时候也有良好的表现。
    3. 和前端的响应式设计是不同的概念。前端的响应式设计指的是自适应。

    2.函数式编程

    1. 函数式编程中,由于数据是不可变的,因此没有并发编程的问题,是线程安全的。它将计算机运算看做数学中的的函数计算,主要特点是将计算过程分解为多个可复用的函数,并且避免了状态及变量的概念。
    2. 函数式编程具有以下特点:
      1) 函数是“第一等公民” :所谓第一等公民是指函数与其他数据类型一样,处于平等地位,可以赋值给其他变量,也可以作为参数,传入另外一个函数,或者作为别的函数的返回值。
      2)闭包和高阶函数:闭包是起函数的作用并可以像对象一样操作的对象。
      3)递归:把递归作为控制流程的机制。
      4)惰性求值。
      5)没有“副作用”:指的是函数内部与外部互动,产生运算以为的其他结果。函数式编程强调没有“副作用”,意味着函数要保持独立,所有功能就是返回一个新值,没有其他行为,尤其是不能修改外部变量的值。

    1.2 RxJava简介

    1.RxJava的由来

    RxJava是rx的Java实现。

    2. Rx是什么

    RX是reactive Extension的缩写,是由微软架构师所开发的一种编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。

    3.ReactiveX的历史

    微软的定义是,rx是一个函数库。
    reactiveX.io给的定义是:Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。

    4.RX模式

    1. 使用观察者模式
    • 创建:Rx可以方便的创建事件流和数据流。
    • 组合:Rx使用查询式的操作符组合和变换数据流。
    • 监听:Rx可以订阅任何可观察的数据流并执行操作。
    1. 简化代码
    • 函数式风格:对可观察的数据流使用无副作用的输入/输出函数,避免程序里错综复杂的状态。
    • 简化代码:Rx操作符通常可以将复杂的难题简化为很少的几行代码。
    • 异步错误处理:传统的try/catch没办法处理异步寄宿,Rx提供了合适的错误处理机制。
    • 轻松使用并发:Rx的Observables和Schedulers可以让开发者摆脱底层的线程同步和各种并发问题。
      总之,RxJava是Rx在JVM平台上的一个实现,通过使用观察者序列来构建异步、基于事件的程序。

    1.3 为何选择RxJava

    • 可组合
      对于单层的异步操作来说,Java中的Future对象处理非常简单有效,但是一旦涉及嵌套,它们就开始变得非常繁琐和复杂。
    • 更灵活
      RxJava的Observable不仅支持单独的标量值(就想Future可以做的),还支持数据序列,甚至是无穷的数据流。
    • 无偏见
      Rx对于并发性或异步性没有任何特殊的偏好,Observable可以用任何方式(如线程池、事件循环、非阻塞I/O)来满足我们的需求。无论我们怎样实现它,无论底层实现时阻塞还是非阻塞的,客户端代码都将与Observable的全部交互当成是异步的。

    1.4 RxJava能做什么

    能做的很多,但是最合适的就是多线程处理,超方便。Android和服务端都可以。

    1.6 小结

    RxJava是结合了多种设计模式并优化而产生的结晶。
    RxJava的思维导图。


    思维导图

    第2章 RxJava基础知识

    2.1 Observable

    RxJava的使用通常需要三步。
    1)创建Observable
    Observable的字面意思就是被观察者,使用RxJava时需要创建一个被观察者,它会决定什么时候触发怎样的事件。可以看做是上游发送命令。
    2)创建Observer
    Observer即观察者,它可以在不同的线程中执行任务。这种模式极大的简化了并发操作,因为它创建了饿一个处于待命状态的观察者哨兵,可以在未来某个时刻响应Observable的通知,而不需要阻塞等待Observable发射数据。
    3)使用subscribe()进行订阅
    创建了Observable和Observer之后,需要使用subscribe()方法把它们连接起来。
    可以简化为:

    创建Observable对象,创建Observer对象,然后用subscribe()方法进行关联。

    可以看一段代码示例:

    Obserbale.just("hell world").subscribe(new Consume<String>(){
      @Override
      public void accept(@NonNull String s) throws Exception{
        System.out.println(s);
      }
    }
    

    subscribe()方法有多个重载的方法。
    在RxJava中,被观察者(Observable)、观察者(Observer)、subscribe()方法三者缺一不可,只有使用了subscribe()方法,被观察者才会开始发送数据。


    5中被观察者模式
    5种被观察者模式描述

    只有Flowable支持背压,所以在需要背压的情况下,则必须使用Flowable。

    do操作符

    do操作可以给Observable的生命周期的各个阶段加上一系列的回调监听,当Observable执行到这个阶段时,这些回调就会被触发。在RxJava中包含了很多doXXX操作符。各个do操作符的用途如下:

    操作符 用途
    doOnSubscribe 一旦观察者订阅了Observable,它就会被调用
    doOnLifecycle 可以在观察者订阅之后,设置是否取消订阅
    doOnNext 它产生的Observable每发射一项数据就会调用它一次,它的Consumer接受发射的数据项。一般用于在subscribe之前对数据进行处理
    doOnEach 它产生的Observable每发射一项数据就会调用它一次,不仅包括onNext,还包括onError和onCompleted
    doAfterNext 在onNext之后执行,而doOnNext()是在onNext之前执行
    doOnCompolete 当它产生Observable在正常终止调用onComplete时会被调用
    doFinally 在当产生的Observable终止之后被调用,无论是否正常终止还是异常终止。doFinally优先于doAfterTerminate的调用
    doAfterTerminate 注册一个Action,当Observable调用onComplete或onError时触发

    2.2 Hot Observable和Cold Observable

    1. Observable的分类

    Observable有Hot和Cold之分。Hot Observable无论有没有订阅者订阅都会发送事件。当Hot Observable有多个订阅者时,Hot Observable与订阅者的关系时一对多的关系,可以与多个订阅者共享信息。
    Cold Observalbe是只有观察者订阅了,才开始进行发射数据流代码,并且是一对一的关系。当有多个不同的订阅者时,消息是重新完整发送的。也就是说对于多个Observer,他们各自的事件是独立的。

    Hot Obseravle是一个电台,大家听到的是同一首歌;Cold Observable是一张音乐CD,人们可以独立购买并听取它。

    2.Cold Observable

    Observable的just、create、range、fromXXX等操作符都能生成Cold Observable。尽管Cold Observable很好,但是对于某些事件不确定何时发生不确定Observable发射的元素数量的情况,还需要Hot Observable。

    3.Cold Observable如何转换成Hot Observable

    1)使用publish,生成ConnectableObservable。

    ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
                @Override
                public void subscribe(ObservableEmitter<Long> e) throws Exception {
                    Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation()).take(Integer.MAX_VALUE).subscribe(e::onNext);
                }
            }).observeOn(Schedulers.newThread()).publish();
    
            //ConnectableObservable需要调用connect()方法才能真正执行
            observable.connect();
            Consumer<Long> subscribe1 = new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("subscriber1:" + aLong);
                }
            };
    
            Consumer<Long> subscribe2 = new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("subscriber1:" + aLong);
                }
            };
    
            observable.subscribe(subscribe1);
            observable.subscribe(subscribe2);
    
            try {
                Thread.sleep(20L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    

    输出结果为:

    subscriber1:0
    subscriber2:0
    subscriber1:1
    subscriber2:1
    subscriber1:2
    subscriber2:2

    ConnectableObservable是线程安全的,需要调用connect()方法才会执行。
    2)使用Subject/Processor
    Subject和Processor的作用相同。Processor是RxJava 2.x新增的类,继承自Flowable,支持背压控制,而Subject则不支持背压控制。
    Subject既是Observable,又是Observer(Subscriber)。这一点可以从Subject的源码上看到,继承自Observable,实现Observer。
    Subject作为观察者,可以订阅目标Cold Observable,使对方开始发送事件。同是它又作为Observable转发或者发送新的事件,让Cold Observable借助Subject转换为Hot Observable。
    Subject并不是线程安全的,如果想要其线程安全,则需要调用toSerialized()方法。

    4. Hot Observable如何转换成Cold Observable

    1)ConnectableObservable的refCount操作符

    RefCount
    RefCount操作符把一个可连接的Observable连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者/观察者订阅这个Observable时,RefCount连接到下层的可连接Observable。RefCount跟踪有多少个观察者订阅它,知道最后一个观察者完成,才断开与下层可连接Observable的连接。
    示例代码
    package chapter2;
    
    import io.reactivex.Observable;
    import io.reactivex.ObservableEmitter;
    import io.reactivex.ObservableOnSubscribe;
    import io.reactivex.disposables.Disposable;
    import io.reactivex.functions.Consumer;
    import io.reactivex.observables.ConnectableObservable;
    import io.reactivex.schedulers.Schedulers;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author shaopeng.wei
     * @since 2019-01-14 14:45
     * Purpose
     */
    public class ConnectObservable2ColdObservable {
        public static void main(String[] args) throws InterruptedException {
            Consumer<Long> subscriber1 = new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("subscriber1:" + aLong);
                }
            };
    
            Consumer<Long> subscriber2 = new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    System.out.println("subscriber2:" + aLong);
                }
            };
    
    
            Observable<Long> longObservable = Observable.create(new ObservableOnSubscribe<Long>() {
                @Override
                public void subscribe(ObservableEmitter<Long> e) throws Exception {
                    Observable.interval(5, TimeUnit.MILLISECONDS, Schedulers.io()).take(Integer.MAX_VALUE).subscribe(e::onNext);
                }
            }).observeOn(Schedulers.newThread());
            System.out.println("默认cold Observable,subscriber1先发送10ms,预期subscriber2不受干扰从0开始");
            Disposable subscribe1 = longObservable.subscribe(subscriber1);
            Thread.sleep(10L);
            Disposable subscribe2 = longObservable.subscribe(subscriber2);
    
    
            Thread.sleep(20L);
            subscribe1.dispose();
            subscribe2.dispose();
    
            System.out.println("和预期一直,并没有因为subscriber1先执行而导致subscriber2跟着subscriber1开始");
            System.out.println("————————————————————————————————————————————————————————————————————");
            System.out.println("Cold Observable转化为Hot 不会重新开始,所有subscriber2会跟着subscriber1的数,不会从0开始。");
            ConnectableObservable<Long> publish = longObservable.publish();
            publish.connect();
    
            Disposable disposable1 = publish.subscribe(subscriber1);
            Thread.sleep(10L);
            Disposable disposable2 = publish.subscribe(subscriber2);
    
            Thread.sleep(20L);
            disposable1.dispose();
            disposable2.dispose();
            System.out.println("发现和预期一致");
            System.out.println("————————————————————————————————————————————————————————————————————");
            System.out.println("预期和cold Observable一样");
            Observable<Long> observable = publish.refCount();
            disposable1 = observable.subscribe(subscriber1);
            Thread.sleep(20L);
            disposable2 = observable.subscribe(subscriber2);
    
            Thread.sleep(20L);
            disposable1.dispose();
            disposable2.dispose();
            System.out.println("发现预期并不一样。因为如果不是所有的订阅者/观察者取消了订阅,而只是部分取消,则部分的订阅者/观察者重新开始订阅时,不会从头开始数据流");
    
        }
    }
    

    当不是所有的订阅者/观察者取消订阅,而只是部分取消时,部分订阅者/观察者重新订阅时,数据流并不会从头开始
    2)Observable的share操作符
    share操作符封装了publish的.refCount()调用。

    2.3 Flowable

    Flowable是RxJava 2.x引入的新的实现,支持非阻塞式的背压。与FLowable相比,使用Observable较好的场景如下:

    • 一般处理不超过1000条数据,并且几乎不会出现内存溢出;
    • GUI鼠标事件,基本不会背压
    • 处理同步流
      使用Flowable较好的场景如下:
    • 处理以某种方式产生超过10KB的元素;
    • 文件读取与分析;
    • 读取数据库记录,也是一个阻塞和基于拉取模式;
    • 网络I/O流
    • 创建一个响应式非阻塞接口

    2.5 Single、Completable和Maybe

    1.SIngle

    从SingleEmitter源码可以看出,Single的只有onSuccess和onError事件。其中onSuccess()用于发射数据(在Observable/Flowable中使用onNext()来发射数据),而且只能发射一个数据,后面即使再发射数据也不会做任何处理。
    Single的SingleObservable中只有onSuccess和onError,并没有onComplete,这也是Single与其他4中观察者之间最大的区别。
    Single可以通过toXXX方法转化成Observable、Flowable、Completable以及Maybe。

    2.Completable

    Completable在创建后,不会发射任何数据。只有onComplete和onError事件,同事Completable并没有map、flatMap等操作符,它的操作符比Observable/Flowable要少很多。
    Completable经常结合andThen操作符使用。

    3.Maybe

    Maybe可以看做Single和Completable的结合体。

    2.5 Subject和Processor

    2.5.1 Subject是一种特殊的存在

    Subject及时Observable也是Observer。官网称可以看做一个桥梁或者代理。

    1.Subject的分类

    Subject包含4中类型,分别是AsyncSubject、BehaviorSubject、ReplaySubject和PublishSubject。
    1) AsyncSubject
    Observer会接受AsyncSubject的onComple()之前的最后一个数据。即在前面有多个onNext,只有在onComplete前的一个生效。

    注意:可以看做必须调用subject.onComplete才会开始发送数据,否则观察者将不接受任何数据。

    2). BehaviorSubject
    Observer会先接受到BehaviorSubject被订阅之前的最后一个数据,再接受订阅之后发射过来的数据,如果BehaviorSubject被订阅之前没有发送任何数据,则会发送一个默认数据。
    3)ReplaySubject
    ReplaySubject会发射所有来自原始Observable的数据给观察者,无论他们是合适订阅的。
    ReplaySubject除了可以通过createWithSize来限制缓存数据的数量外,还可以通过createWithTime()来限制缓存时间。
    4)PublishSubject
    Observer只接受PublishSubject被订阅之后发送的数据。
    通过前面的介绍,总结4个Subject的特性为:

    Subject 发射行为
    AsnycSubject 不论订阅在什么时候发生,只发射最后一个数据
    BehaviorSubject 发送订阅前的一个数据和订阅后的全部数据
    ReplaySubject 不论订阅在什么时候,都全部发射
    PublishSubject 发送订阅之后的全部数据

    第3章 创建操作符

    RxJava常用操作符
    1. jush():将一个或多个对象转换成发射这个或这些对象的一个Observable。
    2. from():将一个Iterable、一个Future或者一个数组转换成一个Observable。
    3. create():使用一个函数从头创建一个Observable。
    4. defer():只有当订阅者订阅才创建Observable,为每个订阅者创建一个新的Observable。
    5. range():创建一个发射指定范围的整数序列的Observable。
    6. interval():创建一个按照给定的时间间隔发射整数序列的Observable。
    7. timer():创建一个在给定的延时之后发射耽搁数据的Observable。
    8. empty():创建一个什么都不做直接通知完成的Observable。
    9. error():创建一个什么都不做直接通知错误的Observable。
    10. never():创建一个不发射任何数据的Observable。

    3.1 create、just和from

    1. create

    用函数从头创建一个Observable。RxJava建议我们在传递给create方法的函数时,先检查一下观察者的isDisposed状态,以便在没有观察者的时候,让我们的Observable停止发射数据,防止运行昂贵的运算。

    2. just

    just类似于from,但是from会将数组或Iterable数据取出然后逐个发射,而just只是简单的原样发射,将数组或iterable当做单个数据。
    它可以接受一致十个参数,返回一个按参数列表顺序发射这些数据的Observable。

    传入null会报npe

    3.from

    from有一个两个参数的方法,可以设置超时时间和超时单位。

    Observable.fromFuture(future,3,TimeUnit.SECONDS)
    

    3.2 repeat

    创建一个发射特定数据重复多次的Observable。repeat会重复地发射数据,某些实现允许我们重复发射某个数据序列,还有一些允许我们限制重复的次数。

    1. repeatWhen

    repeatWhen不是缓存或重放原始Observable的数据序列,而是有条件重新订阅和发射原来的Observable。

    2.repeatUntil

    表示直到某个条件就不再重复发射数据,即当条件为true时,表示中止重复发射上游的Observable。

    3.3 defer、interval和timer

    1. defer

    直到有观察者订阅时才创建Observable,并且为每个观察者创建一个全新的Observable,如图:


    defer操作符

    2.interval

    创建一个按照固定时间间隔发射整数序列的Observable。

    3.timer

    创建一个Observable,它在一个特定的延迟后发射一个特殊的值。timer操作符创建一个在特定的时间段之后返回一个特殊值的Observable。

    第4章 RxJava的线程操作

    4.1 调度器(Scheduler)种类

    1. RxJava线程介绍

    RxJava是为了异步编程实现的一个框架,异步是其重要的特性。

    2.Scheduler

    Scheduler是RxJava对线程控制器的一个抽象,RxJava内置了多个Scheduler的实现,他们基本满足绝大多数使用场景。

    Scheduler 作用
    single 使用定长为1的线程池,重复利用这个线程
    newThread 每次启用新线程,并在新线程中执行操作
    computation 使用固定的线程池,大小为CPU核数,适用于CPU密集型计算
    io 适合I/O操作。行为模式和newTread()差不多,区别在于io()内部是一个无数量上限的线程池,可以重用空闲的线程。
    trampoline 直接在当前线程运行,如果当前线程中有其他任务正在之慈宁宫,则会先暂停其他任务
    Scheduler.from 将java.util.comcurrent.Executor转换成一个调度器实例,即可以自定义一个Executor来作为调度器

    4.2 RxJava线程模型

    2. 线程调度

    默认情况下不做任何线程处理,Observable和Observer处于同一线程中。如果想要切换线程,则可以使用subscribeOn()和ObserverOn()
    1)subscribeOn
    subscribeOn()改变上游发射数据的线程池,只有第一次有效。
    2)observeOn
    observeOn用来指定下游操作运行在特定的线程调度器Scheduler上。
    多次执行,每次执行都会切换。

    4.3 Scheduler的测试

    TestScheduler是专门用于测试的调度器,与其他调度器的区别是:TestScheduler只有被调用了时间才会继续,是一种特殊的、非线程安全的调度器,用于测试一些不引入真是并发性、允许手动推进虚拟时间的调度器。
    TestScheduler用来测试一些需要精确时间的任务是非常合适的,减少了等待时间。

    第5章 变换操作符和过滤操作符

    RxJava的变换操作符主要包括以下几种。

    • map:对序列的每一项都用一个函数来变换Observable发射的数据序列
    • flapMap、concatMap、flatMapIterable:将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
    • switchMap:将Observable发射的数据集合变换为Observables集合,然后值发射这些Observables

    第6、7章

    这两章都是一些操作符,没太多深入信息快速浏览。

    第8章 RxJava的背压

    8.1 背压

    指在异步场景下,被观察者发送时间速度远快于观察者处理的速度,从而导致buffer溢出,这种现象叫做背压。

    1. 背压必须是在异步的场景下才会出现,即被观察者和观察者处于不同的线程中。
    2. RxJava是基于push模型的,生产者有数据就发送给消费者。
    3. 在RxJava 2.x中,只有新增的Flowable是支持背压的。

    8.2 RxJava 2.x的背压策略

    RxJava 2.x中,默认队列大小为128,并且要求所有操作符强制支持背压。一共有5种背压策略。

    1. MISSING

    此策略表示创建的Flowable没有指定背压策略,不会做缓存或丢弃处理,需要下游通过背压操作符指定背压策略

    2. ERROR

    表示如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常。

    3. BUFFER

    此策略表示异步患处吃同Observable的一样,没有固定大小,可以无限制添加数据,不会抛出MissingBackpressureException异常,但是会导致OOM。

    4. DROP

    此策略表示,如果FLowable的异步缓冲池满了,则会丢掉将要放入缓存池中的数据。

    5.LATEST

    此策略表示,如果缓存池满了,会丢掉将要放入缓存池中的数据,但是不管缓存池状态如何,LASTEST策略会将最后一条数据强行放入缓存池中。

    第9章 Disposable和Transformer的使用

    9.1 Disposable

    Observable.subscribe()方法会返回一个Disposable的对象,可以通过调用dispose方法来切断数据流。

    CompositeDisposable

    可以使用CompositeDisposable.add()方法把多个Disposable对象添加到容器中,然后通过CompositeDisposable.clear()即可切断所有的事件。

    第10章 RxJava的并行编程

    10.1 RxJava并行操作

    并发是指一个处理器同事处理多个任务。并行是多个处理器或者多核处理器同时处理多个不同的任务。

    总结

    这本书是一本流水账式的操作手册,感觉写的不是很好。对于初学者没有写如何上手,对于资深者又没有太深入的东西。全程快速的扫描了一遍,主要是弥补部分遗漏的知识点,看完收获不是很大。总体来说不值得卖纸质版,当当的5块钱电子版的定价就不错。

    相关文章

      网友评论

          本文标题:RxJava 2.x实战

          本文链接:https://www.haomeiwen.com/subject/xutaxqtx.html