美文网首页
RxJava使用详解

RxJava使用详解

作者: _好好学习 | 来源:发表于2019-08-25 15:42 被阅读0次

    official define:
    Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
    翻译过来就是:
    Java虚拟机的响应式扩展——一个用于使用Java虚拟机的可观察序列来编写异步和基于事件的程序库。
    简单来说就是:一个用于处理异步事件的框架。

    有同学不仅感到疑惑了,Android的异步处理机制又不是只有这么一个,学习了其他的异步处理机制已经够用了,为什么还要学习这个呢?有这样想法的同学怕不是没遇到异步处理逻辑复杂时代码何其混乱的情况。而RxJava随着程序逻辑变得越来越复杂,它依然能够保持简洁。

    言归正传,学习过后,有了对比,你就知道为什么这个框架会这么火了。(本文介绍RxJava 3.x版本)

    [toc]

    设置依赖

    implementation 'io.reactivex.rxjava3:rxjava:3.x.y'
    

    写一个HelloWrold

    package rxjava.examples;
    
    import io.reactivex.rxjava3.core.*;
    
    public class HelloWorld {
        public static void main(String[] args) {
            Flowable.just("Hello world").subscribe(System.out::println);
        }
    }
    

    如果你的平台不支持Java 8 lambdas表达式,那么你必须手动创建消费者的内部类:

    import io.reactivex.rxjava3.functions.Consumer;
    
    Flowable.just("Hello world")
      .subscribe(new Consumer<String>() {
          @Override public void accept(String s) {
              System.out.println(s);
          }
      });
    

    请注意,RxJava 3组件现在位于io.reactivex.rxjava3下,而基类和接口则位于io.reactivex.rxjava3.core下。

    认识一下基类

    io.reactivex.rxjava3.core.Flowable: 0..N 流, 支持响应流和背压(下文有解析)
    io.reactivex.rxjava3.core.Observable: 0..N 流,,无背压
    io.reactivex.rxjava3.core.Single: 只有一项正确或否的流,
    io.reactivex.rxjava3.core.Completable: 没有项目但只有完成或错误信号的流程,
    io.reactivex.rxjava3.core.Maybe: 没有项,只有一项,或错误的流

    一些术语

    上行数据流,下行数据流

    RxJava中的数据流由源,零个或多个中间步骤组成,后跟数据消费者或组合器步骤(其中步骤负责通过某种方式消费数据流):

    source.operator1().operator2().operator3().subscribe(consumer);
    
    source.flatMap(value -> source.operator1().operator2().operator3());
    

    在这里,如果我们想象自己在操作器2上,朝着左边向源头看,被称为上游。向右侧看用户/消费者,称为下游。当每个元素都写在单独的行上时,这一点会更容易理解:

    source
      .operator1()
      .operator2()
      .operator3()
      .subscribe(consumer)
    

    运动中的对象

    在RxJava的文档中,发射,发射,项目,事件,信号,数据和消息被视为同义词,并表示沿数据流传播的对象。

    解释背压

    当数据流通过异步步骤运行时,每个步骤可能以不同的速度执行不同的事情。为了避免压倒性的这些步骤(通常表现为由于临时缓冲或需要跳过/删除数据而导致内存使用量增加),应用了所谓的背压,这是一种流控制形式,步骤可以表示它们准备处理的项目数。这允许在一个步骤通常无法知道上游将向其发送多少项的情况下限制数据流的内存使用。

    大概就是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。感兴趣的小伙伴可以模拟这种情况,在差距太大的时候,我们的内存会猛增,直到OOM。

    在RxJava中,专用的可流类被指定为支持背压,而Observable则专门用于非背压操作(短序列、GUI交互等)。其他类型,单一的,可能的和复杂的,不支持也不应该支持背压。

    装配时间

    通过应用各种中间运算符来准备数据流发生在所谓的组装时间中:

    Flowable<Integer> flow = Flowable.range(1, 5)
    .map(v -> v * v)
    .filter(v -> v % 3 == 0);
    

    此时,数据还没有流动,也没有副作用发生。

    订阅时间

    当在内部建立处理步骤链的流上调用subscribe()时,这是一种临时状态:

    flow.subscribe(System.out::println)
    

    这时会触发订阅副作用(请参阅doOnsubscribe)。在这种状态下,某些源会立即阻止或开始发出项目。

    运行

    这是数据流主动发出项目,错误或完成信号时的状态:

    Observable.create(emitter -> {
         while (!emitter.isDisposed()) {
             long time = System.currentTimeMillis();
             emitter.onNext(time);
             if (time % 2 != 0) {
                 emitter.onError(new IllegalStateException("Odd millisecond!"));
                 break;
             }
         }
    })
    .subscribe(System.out::println, Throwable::printStackTrace);
    

    也就是在上面给出的示例的主体执行时。

    简单后台计算

    RxJava的一个常见用例是在后台线程上运行一些计算、网络请求并在UI线程上显示结果(或报错):

    import io.reactivex.rxjava3.schedulers.Schedulers;
    
    Flowable.fromCallable(() -> {
        Thread.sleep(1000); //  imitate expensive computation
        return "Done";
    })
      .subscribeOn(Schedulers.io())
      .observeOn(Schedulers.single())
      .subscribe(System.out::println, Throwable::printStackTrace);
    
    Thread.sleep(2000); // <--- wait for the flow to finish
    

    这种类型的链接方法被称为类似于生成器模式的FluentAPI。然而,RxJava的反应类型是不可变的;每个方法调用都返回一个新的可流性,并添加了行为。举例来说,这个例子可以改写如下:

    Flowable<String> source = Flowable.fromCallable(() -> {
        Thread.sleep(1000); //  imitate expensive computation
        return "Done";
    });
    
    Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
    
    Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
    
    showForeground.subscribe(System.out::println, Throwable::printStackTrace);
    
    Thread.sleep(2000);
    

    通常,你可以通过subscribeOn将计算或阻塞IO移动到其他线程。一旦数据准备好了,您就可以确保通过observeOn在前台或GUI线程上对它们进行处理。

    调度者/订阅者(Scheduler)

    RxJava运算符不直接与线程或执行器服务一起工作,而是与所谓的调度程序一起工作,后者抽象出统一API背后的并发源。RxJava3有几个标准的调度程序,可以通过调度程序实用程序类访问。

    Schedulers.computation():在后台固定数量的专用线程上运行计算密集型工作。大多数异步运算符都将其用作默认调度程序

    Schedulers.io():在一组动态变化的线程上运行I/O类或阻塞操作。

    Schedulers.single():以顺序和先进先出的方式在单个线程上运行工作。

    Schedulers.trampoline():在一个参与线程中按顺序和FIFO方式运行工作,通常用于测试目的。

    在所有的JVM平台上都可以使用这些工具,但是某些特定的平台(如Android)定义了它们自己的典型调度程序:AndroidSchedulers.mainThread(), SwingScheduler.instance() or JavaFXSchedulers.gui()
    此外,还可以选择包装现有的Executor(及其子类型如ExecutorService)通过Schedulers.from(Executor)方法到调度者Scheduler。例如,这可以用来拥有一个更大但仍然固定的线程池(分别与calculation()io()不同)。

    可以看到上文中Thread.sleep(2000);运行到最后都不会出现意外。在RxJava,默认调度程序Scheduler运行在守护进程线程上,这意味着一旦Java主线程退出,它们都会停止,后台计算可能永远不会发生。在这个例子中,睡眠一段时间可以让您在控制台上看到流的输出,并有足够的空闲时间。

    流中的并发性

    RxJava中的流本质上是连续的,分为几个处理阶段,这些处理阶段可能同时运行:

    Flowable.range(1, 10)
      .observeOn(Schedulers.computation())
      .map(v -> v * v)
      .blockingSubscribe(System.out::println);
    

    在这个示例中,流在计算调度程序上对从1到10的数字进行平方,并使用“主”线程上的结果(更准确地说,是blockingSubscribe的调用线程)。但是,lambda v->v*v 并没有为此流并行运行;它在同一个计算线程上一个接一个地接收值1到10。

    并行处理

    并行处理数字1到10要复杂一些:

    Flowable.range(1, 10)
     .flatMap(v ->
         Flowable.just(v)
           .subscribeOn(Schedulers.computation())
           .map(w -> w * w)
     )
     .blockingSubscribe(System.out::println);
    

    实际上,RxJava中的并行性意味着运行独立的流,并将结果合并回单个流。操作符flatMap首先将每个数字从1到10映射到它自己的单独的流中,运行它们并合并计算结果。

    但请注意,flatMap不保证任何顺序,内部流的最终结果可能最终交错。

    这里还有可供替代的操作:

    • concatMap 一次映射并运行一个内部流
    • concatMapEager 它会“同时”运行所有内部流程,但输出流程将按创建内部流程的顺序排列。

    或者,Flowable.parallel()运算符和parallelFlowable类型有助于实现相同的并行处理模式:

    Flowable.range(1, 10)
      .parallel()
      .runOn(Schedulers.computation())
      .map(v -> v * v)
      .sequential()
      .blockingSubscribe(System.out::println);
    

    依赖子流程

    flatMap是一个强大的操作者,在很多情况下都能提供帮助。例如,给定一个返回流的服务,我们想调用另一个具有第一个服务发出的值的服务:

    Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
    
    inventorySource
        .flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId())
                .map(demand -> "Item " + inventoryItem.getName() + " has demand " + demand))
        .subscribe(System.out::println);
    

    延续

    有时,当一个项目变得可用时,人们希望对它执行一些依赖的计算。这有时被称为延续,根据应该发生什么以及涉及到什么类型,可能需要使用各种运算符来完成。

    • 依赖

    The most typical scenario is to given a value, invoke another service, await and continue with its result:

    service.apiCall()
    .flatMap(value -> service.anotherApiCall(value))
    .flatMap(next -> service.finalCall(next))
    )
    

    通常情况下,后面的序列也需要来自早期映射的值。这可以通过将外部平面图移动到上一个平面图的内部来实现,例如:

    service.apiCall()
    .flatMap(value ->
        service.anotherApiCall(value)
        .flatMap(next -> service.finalCallBoth(value, next))
    )
    

    这里,原始值将在内部flatMap中可用,并由lambda变量捕获提供。

    • 非依赖
      在其他场景中,第一个源/数据流的结果是不相关的,我们希望继续使用准独立的其他源。
    Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
    continued.map(v -> v.toString())
    .subscribe(System.out::println, Throwable::printStackTrace);
    

    然而,这种情况下的延续保持可观察性Observable,而不是更合适的单一Single。(这是可以理解的,因为从flatMapsingle的角度来看,sourceObservable是一个多值源,因此映射也可能产生多个值)。

    通常通过这种方法可以通过使用Completeable作为中介和它的运算符andThen,并使用其他方法来恢复,从而更具表现力(同时也降低了开销):

    sourceObservable
      .ignoreElements()           // returns Completable
      .andThen(someSingleSource)
      .map(v -> v.toString())
    

    sourceObservablesomesingleSource之间的唯一依赖关系是前者应该正常完成,以便后者被使用。

    • 延迟依赖
      有时,前一个序列和新序列之间存在隐式的数据依赖关系,由于某种原因,这些数据依赖关系不是通过“常规通道”流动的。人们倾向于写这样的延续:
    AtomicInteger count = new AtomicInteger();
    
    Observable.range(1, 10)
      .doOnNext(ignored -> count.incrementAndGet())
      .ignoreElements()
      .andThen(Single.just(count.get()))
      .subscribe(System.out::println);
    

    不幸的是,这将打印0,因为当数据流尚未运行时,Single.just(count.get())将在组装时进行计算。我们需要一些东西来延迟这个单一源的计算,直到主源完成运行时为止:

    AtomicInteger count = new AtomicInteger();
    
    Observable.range(1, 10)
      .doOnNext(ignored -> count.incrementAndGet())
      .ignoreElements()
      .andThen(Single.defer(() -> Single.just(count.get())))
      .subscribe(System.out::println);
    

    or

    AtomicInteger count = new AtomicInteger();
    
    Observable.range(1, 10)
      .doOnNext(ignored -> count.incrementAndGet())
      .ignoreElements()
      .andThen(Single.fromCallable(() -> count.get()))
      .subscribe(System.out::println);
    

    类型转换

    有时,源或服务返回的类型与使用它的流不同。例如,在上面的清单示例中,getDemandasync可以返回Single<DemandRecord>。如果代码示例保持不变,这将导致编译时错误。

    在这种情况下,通常有两个选项来修复转换:1)转换为所需类型,或2)查找并使用支持不同类型的特定运算符的重载。

    • 转换为所需类型
      每个响应性基类都具有可以执行此类转换(包括协议转换)以匹配其他类型的运算符。下表显示了可用的转换选项:
    Flowable Observable Single Maybe Completable
    Flowable toObservable first, firstOrError, single, singleError, last, lastError firstElement, singleElement, lastElement ignoreElements
    Observable toFlowable first, firstOrError, single, singleError, last, lastError firstElement, singleElement, lastElement ignoreElements
    Single toFlowable toObservable toMaybe ignoreElement
    Maybe toFlowable toObservable toSingle ignoreElement
    Completable toFlowable toObservable toSingle toMaybe
    • 使用具有所需类型的重载
      许多常用的运算符具有可以处理其他类型的重载。这些通常以目标类型的后缀命名:
    Operator Overloads
    flatMap flatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable
    concatMap concatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable
    switchMap switchMapSingle, switchMapMaybe, switchMapCompletable

    这些运算符使用后缀而不是简单地使用相同的名称和不同的签名的原因是类型擦除。

    Java不考虑操作符operator(Function<T, Single<R>>)和操作符operator(Function<T, Maybe<R>>)等不同的签名(不同于c#),并且由于擦除,这两个操作符最终会以相同的签名作为重复的方法。

    运算符命名约定

    相关文章

      网友评论

          本文标题:RxJava使用详解

          本文链接:https://www.haomeiwen.com/subject/veahectx.html