美文网首页
RxJava2学习笔记

RxJava2学习笔记

作者: 星空下的萝卜 | 来源:发表于2020-02-23 14:03 被阅读0次

    我们为什么选择RxJava

    • Infinite Streams 无限流
    • Asynchronous executions in a sane way 以合理的方式执行异步。
    • Function composition 复合函数
    • Sync and Async functions should be friends 同步和异步功能很友好

    Rxjava - Events in RxJava

    1. Event can wrap a value(Integer, String, Object...)
    2. Event can wrap an exception
    3. An emitter can either produce events and complete or fail

    Rxjava - Event Emitters

    1. Observable
      • Emit zero, one or multiple events, or an error
      • It comes from Rxjava1
      • Flexible and generalist
      • Hot or Cold observable
        1.Cold emitter just produce events when someone subscribe to it. This is the default one.
        2.Hot emitter produce events regardless there is or not a consumer subscribed to it.
      • Observable 会不断的将数据推向有兴趣对数据进行处理的人
    2. Flowable

      • Emit zero, one or multiple events
      • Does not exist in Rxjava1
      • Should be used for infinite streams
      • support for Backpressure(支持背压)
    3. Single - One event or an error
    4. Maybe - Zero or one event, or an error

    RxJava - Operators

    告诉发射器如何操纵事件
    在RxJava中,Operators只是发射器类或对象内的方法

    1. Operators categories
      • Creating Observables
      • Transforming Observables
      • Filtering Observables
      • Combining Observables
      • Error handling
      • Observable Utility
      • Conditional and Boolean
      • Mathenatical and Aggregate
    2. Creating Operators

    -Create Observables:

    • Defer
    • Empty
    • From
    • Interval
    • Timer

    转换 Observables

    观察者的变换算子,数据转换

    1. Map
      • transform the event data, returning a new data (same or different type) 转换事件数据,返回新数据(相同或不同类型)
    2. FlatMap
      • transform the event data, returning another Observable/emitter(with same or different event type)转换事件数据,返回另一个Observable / emitter(具有相同或不同的事件类型)

    Exercise solution

    • From callback hell to ReactiveX

    RxJava 2依赖于4个基础接口,它们分别是

    • Publisher ---> 核心,可以发出一系列的事件
    • Subscriber ---> 核心,负责和处理这一系列的事件
    • Subscription
    • Processor

    背压(Backpressure)

    当数据流通过异步步骤运行时,每个步骤可能以不同的速度执行不同的操作。为避免过多的此类步骤(通常会因临时缓冲或需要跳过/删除数据而增加内存使用量),因此应用了所谓的背压,这是流控制的一种形式,其中的步骤可以表示多少个项目他们准备好处理了吗?在通常无法一步知道上游将发送给它多少项的情况下,这可以限制数据流的内存使用量。
    在RxJava中,专用的Flowable类被指定为支持背压,而Observable被指定为非背压操作(短序列,GUI交互等)。其他类型,Single,Maybe和Completable不支持背压,也不应该支持,总有空间暂时存储一件物品。

    观察者模式

    RxJava2 以观察者模式为骨架,两种观察者模式:

    • Observable (被观察者) / Observer(观察者)
    • Flowable (被观察者) / Subscriber(观察者)
    RxJava骨架.png

    Observable

    线程调度方式

    一、subScribeOn

    • subscribeOn 用于指定 subscribe()时所发生的线程,内部线程调度是通过 ObservableSubscribeOn来实现的。
        /**
         * Asynchronously subscribes Observers to this ObservableSource on the specified {@link Scheduler}.
         * <p>
         * <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
         * <dl>
         *  <dt><b>Scheduler:</b></dt>
         *  <dd>You specify which {@link Scheduler} this operator will use.</dd>
         * </dl>
         *
         * @param scheduler
         *            the {@link Scheduler} to perform subscription actions on
         * @return the source ObservableSource modified so that its subscriptions happen on the
         *         specified {@link Scheduler}
         * @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
         * @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
         * @see #observeOn
         */
        @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    

    二、observeOn

    • observeOn 方法用于指定下游 Observer 回调时发生的线程
        /**
         * Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
         * asynchronously with an unbounded buffer of configurable "island size" and optionally delays onError notifications.
         * <p>
         * <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
         * <dl>
         *  <dt><b>Scheduler:</b></dt>
         *  <dd>You specify which {@link Scheduler} this operator will use.</dd>
         * </dl>
         * <p>"Island size" indicates how large chunks the unbounded buffer allocates to store the excess elements waiting to be consumed
         * on the other side of the asynchronous boundary. Values below 16 are not recommended in performance sensitive scenarios.
         *
         * @param scheduler
         *            the {@link Scheduler} to notify {@link Observer}s on
         * @param delayError
         *            indicates if the onError notification may not cut ahead of onNext notification on the other side of the
         *            scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received
         *            from upstream
         * @param bufferSize the size of the buffer.
         * @return the source ObservableSource modified so that its {@link Observer}s are notified on the specified
         *         {@link Scheduler}
         * @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
         * @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
         * @see #subscribeOn
         * @see #observeOn(Scheduler)
         * @see #observeOn(Scheduler, boolean)
         */
        @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    

    线程切换需要注意事项

    • subscribeOn指定的是发射事件的线程,observerOn指定的是订阅者接收事件的线程。
    • 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn只有第一次的有效,其余的会被忽略。
    • 但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn下游的线程就会切换一次。

    待续。。。

    参考文献:

    RxJava官方文档
    RxJava官方接口文档
    参考了这个外国大佬的视频教程
    简书作者 nanchen2251 的教程

    相关文章

      网友评论

          本文标题:RxJava2学习笔记

          本文链接:https://www.haomeiwen.com/subject/lgqmfhtx.html