我们为什么选择RxJava
- Infinite Streams 无限流
- Asynchronous executions in a sane way 以合理的方式执行异步。
- Function composition 复合函数
- Sync and Async functions should be friends 同步和异步功能很友好
Rxjava - Events in RxJava
- Event can wrap a value(Integer, String, Object...)
- Event can wrap an exception
- An emitter can either produce events and complete or fail
Rxjava - Event Emitters
-
Observable
- Emit zero, one or multiple events, or an error
- It comes from Rxjava1
- Flexible and generalist
- Hot or Cold observable
1.Cold emitter just produce events when someone subscribe to it. This is the default one.
2.Hot emitter produce events regardless there is or not a consumer subscribed to it. - Observable 会不断的将数据推向有兴趣对数据进行处理的人
-
Flowable
- Emit zero, one or multiple events
- Does not exist in Rxjava1
- Should be used for infinite streams
- support for Backpressure(支持背压)
-
Single - One event or an error
-
Maybe - Zero or one event, or an error
RxJava - Operators
告诉发射器如何操纵事件
在RxJava中,Operators
只是发射器类或对象内的方法
- Operators categories
- Creating Observables
- Transforming Observables
- Filtering Observables
- Combining Observables
- Error handling
- Observable Utility
- Conditional and Boolean
- Mathenatical and Aggregate
- Creating Operators
-Create Observables:
- Defer
- Empty
- From
- Interval
- Timer
转换 Observables
观察者的变换算子,数据转换
- Map
- transform the event data, returning a new data (same or different type) 转换事件数据,返回新数据(相同或不同类型)
- FlatMap
- transform the event data, returning another Observable/emitter(with same or different event type)转换事件数据,返回另一个Observable / emitter(具有相同或不同的事件类型)
Exercise solution
- From callback hell to ReactiveX
RxJava 2依赖于4个基础接口,它们分别是
-
Publisher
---> 核心,可以发出一系列的事件 -
Subscriber
---> 核心,负责和处理这一系列的事件 Subscription
Processor
背压(Backpressure)
当数据流通过异步步骤运行时,每个步骤可能以不同的速度执行不同的操作。为避免过多的此类步骤(通常会因临时缓冲或需要跳过/删除数据而增加内存使用量),因此应用了所谓的背压,这是流控制的一种形式,其中的步骤可以表示多少个项目他们准备好处理了吗?在通常无法一步知道上游将发送给它多少项的情况下,这可以限制数据流的内存使用量。
在RxJava中,专用的Flowable
类被指定为支持背压,而Observable
被指定为非背压操作(短序列,GUI交互等)。其他类型,Single,Maybe和Completable不支持背压,也不应该支持,总有空间暂时存储一件物品。
观察者模式
RxJava2 以观察者模式为骨架,两种观察者模式:
-
Observable
(被观察者) /Observer
(观察者) -
Flowable
(被观察者) /Subscriber
(观察者)
Observable
线程调度方式
一、subScribeOn
-
subscribeOn
用于指定subscribe()
时所发生的线程,内部线程调度是通过ObservableSubscribeOn
来实现的。
/**
* Asynchronously subscribes Observers to this ObservableSource on the specified {@link Scheduler}.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/subscribeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
*
* @param scheduler
* the {@link Scheduler} to perform subscription actions on
* @return the source ObservableSource modified so that its subscriptions happen on the
* specified {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/subscribeon.html">ReactiveX operators documentation: SubscribeOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #observeOn
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
二、observeOn
-
observeOn
方法用于指定下游Observer
回调时发生的线程
/**
* Modifies an ObservableSource to perform its emissions and notifications on a specified {@link Scheduler},
* asynchronously with an unbounded buffer of configurable "island size" and optionally delays onError notifications.
* <p>
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>You specify which {@link Scheduler} this operator will use.</dd>
* </dl>
* <p>"Island size" indicates how large chunks the unbounded buffer allocates to store the excess elements waiting to be consumed
* on the other side of the asynchronous boundary. Values below 16 are not recommended in performance sensitive scenarios.
*
* @param scheduler
* the {@link Scheduler} to notify {@link Observer}s on
* @param delayError
* indicates if the onError notification may not cut ahead of onNext notification on the other side of the
* scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received
* from upstream
* @param bufferSize the size of the buffer.
* @return the source ObservableSource modified so that its {@link Observer}s are notified on the specified
* {@link Scheduler}
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
* @see #subscribeOn
* @see #observeOn(Scheduler)
* @see #observeOn(Scheduler, boolean)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
线程切换需要注意事项
-
subscribeOn
指定的是发射事件的线程,observerOn
指定的是订阅者接收事件的线程。 - 多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用
subscribeOn
只有第一次的有效,其余的会被忽略。 - 但多次指定订阅者接收线程是可以的,也就是说每调用一次
observerOn
下游的线程就会切换一次。
待续。。。
网友评论