美文网首页
RxJava2学习笔记

RxJava2学习笔记

作者: 星空下的萝卜 | 来源:发表于2020-02-23 14:03 被阅读0次

我们为什么选择RxJava

  • Infinite Streams 无限流
  • Asynchronous executions in a sane way 以合理的方式执行异步。
  • Function composition 复合函数
  • Sync and Async functions should be friends 同步和异步功能很友好

Rxjava - Events in RxJava

  1. Event can wrap a value(Integer, String, Object...)
  2. Event can wrap an exception
  3. An emitter can either produce events and complete or fail

Rxjava - Event Emitters

  1. Observable
    • Emit zero, one or multiple events, or an error
    • It comes from Rxjava1
    • Flexible and generalist
    • Hot or Cold observable
      1.Cold emitter just produce events when someone subscribe to it. This is the default one.
      2.Hot emitter produce events regardless there is or not a consumer subscribed to it.
    • Observable 会不断的将数据推向有兴趣对数据进行处理的人
  2. Flowable

    • Emit zero, one or multiple events
    • Does not exist in Rxjava1
    • Should be used for infinite streams
    • support for Backpressure(支持背压)
  3. Single - One event or an error
  4. Maybe - Zero or one event, or an error

RxJava - Operators

告诉发射器如何操纵事件
在RxJava中,Operators只是发射器类或对象内的方法

  1. Operators categories
    • Creating Observables
    • Transforming Observables
    • Filtering Observables
    • Combining Observables
    • Error handling
    • Observable Utility
    • Conditional and Boolean
    • Mathenatical and Aggregate
  2. Creating Operators

-Create Observables:

  • Defer
  • Empty
  • From
  • Interval
  • Timer

转换 Observables

观察者的变换算子,数据转换

  1. Map
    • transform the event data, returning a new data (same or different type) 转换事件数据,返回新数据(相同或不同类型)
  2. FlatMap
    • transform the event data, returning another Observable/emitter(with same or different event type)转换事件数据,返回另一个Observable / emitter(具有相同或不同的事件类型)

Exercise solution

  • From callback hell to ReactiveX

RxJava 2依赖于4个基础接口,它们分别是

  • Publisher ---> 核心,可以发出一系列的事件
  • Subscriber ---> 核心,负责和处理这一系列的事件
  • Subscription
  • Processor

背压(Backpressure)

当数据流通过异步步骤运行时,每个步骤可能以不同的速度执行不同的操作。为避免过多的此类步骤(通常会因临时缓冲或需要跳过/删除数据而增加内存使用量),因此应用了所谓的背压,这是流控制的一种形式,其中的步骤可以表示多少个项目他们准备好处理了吗?在通常无法一步知道上游将发送给它多少项的情况下,这可以限制数据流的内存使用量。
在RxJava中,专用的Flowable类被指定为支持背压,而Observable被指定为非背压操作(短序列,GUI交互等)。其他类型,Single,Maybe和Completable不支持背压,也不应该支持,总有空间暂时存储一件物品。

观察者模式

RxJava2 以观察者模式为骨架,两种观察者模式:

  • Observable (被观察者) / Observer(观察者)
  • Flowable (被观察者) / Subscriber(观察者)
RxJava骨架.png

Observable

线程调度方式

一、subScribeOn

  • subscribeOn 用于指定 subscribe()时所发生的线程,内部线程调度是通过 ObservableSubscribeOn来实现的。
    /**
     * Asynchronously subscribes Observers to this ObservableSource on the specified {@link Scheduler}.
     * <p>
     * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
     * <dl>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>You specify which {@link Scheduler} this operator will use.</dd>
     * </dl>
     *
     * @param scheduler
     *            the {@link Scheduler} to perform subscription actions on
     * @return the source ObservableSource modified so that its subscriptions happen on the
     *         specified {@link Scheduler}
     * @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
     * @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
     * @see #observeOn
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

二、observeOn

  • observeOn 方法用于指定下游 Observer 回调时发生的线程
    /**
     * Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
     * asynchronously with an unbounded buffer of configurable "island size" and optionally delays onError notifications.
     * <p>
     * <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
     * <dl>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>You specify which {@link Scheduler} this operator will use.</dd>
     * </dl>
     * <p>"Island size" indicates how large chunks the unbounded buffer allocates to store the excess elements waiting to be consumed
     * on the other side of the asynchronous boundary. Values below 16 are not recommended in performance sensitive scenarios.
     *
     * @param scheduler
     *            the {@link Scheduler} to notify {@link Observer}s on
     * @param delayError
     *            indicates if the onError notification may not cut ahead of onNext notification on the other side of the
     *            scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received
     *            from upstream
     * @param bufferSize the size of the buffer.
     * @return the source ObservableSource modified so that its {@link Observer}s are notified on the specified
     *         {@link Scheduler}
     * @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
     * @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
     * @see #subscribeOn
     * @see #observeOn(Scheduler)
     * @see #observeOn(Scheduler, boolean)
     */
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

线程切换需要注意事项

  • subscribeOn指定的是发射事件的线程,observerOn指定的是订阅者接收事件的线程。
  • 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn只有第一次的有效,其余的会被忽略。
  • 但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn下游的线程就会切换一次。

待续。。。

参考文献:

RxJava官方文档
RxJava官方接口文档
参考了这个外国大佬的视频教程
简书作者 nanchen2251 的教程

相关文章

  • RxJava2笔记(五、订阅流程梳理以及线程切换次数有效性)

    在前面的几篇文章RxJava2笔记(一、事件订阅流程)RxJava2笔记(二、事件取消流程)RxJava2笔记(三...

  • Android Develop——RxJava2(二) RxJa

    在RxJava2(一)教程中,已经跟着大神们学习了RxJava2的基本使用,现在我们来学习一下RxJava2很强大...

  • RxJava2笔记(三、订阅线程切换)

    在前面两篇文章RxJava2笔记(一、事件订阅流程)和RxJava2笔记(二、事件取消流程)中,我们分别了解了事件...

  • RxJava2 学习笔记

    函数响应式编程 函数响应式编程的思维是将问题抽象为数据加工,一切问题都是数据源发出数据的问题,所以用函数响应式编程...

  • RxJava2学习笔记

    intro "森林里的一棵树倒下来,如果周围没有人听见,那么就等于说树的倒下是寂静无声的." 随着产品功能的增加,...

  • rxjava2 学习笔记

    特点 链式调用 线程切换 操作符 创建操作符 转换操作符 过滤操作符 组合操作符 错误处理操作符 辅助性操作符 条...

  • RxJava2学习笔记

    本文参考RxJava2 只看这一篇文章就够了,强烈推荐大家去看一下。 RxJava的组成 被观察者-------O...

  • RxJava2学习笔记

    我们为什么选择RxJava Infinite Streams 无限流 Asynchronous execution...

  • Rxjava2学习笔记(一)

    网上大佬们都整理得很好了,只是老看,总觉得差点啥,所以,就想着自己再捋一遍,加深印象。给 Android 开发者的...

  • RxJava2 学习笔记(二)

    线程调度源码分析1:subscribeOn subscribeOn(Schedulers.io())这句其实就是创...

网友评论

      本文标题:RxJava2学习笔记

      本文链接:https://www.haomeiwen.com/subject/lgqmfhtx.html