image.png在ReactiveX中,一个观察者(Observer)订阅一个可观察对象(Observable)。观察者对Observable发射的数据或数据序列作出响应。这种模式可以极大地简化并发操作,因为它创建了一个处于待命状态的观察者哨兵,在未来某个时刻响应Observable的通知,不需要阻塞等待Observable发射数据。
- 这是Observable的时间轴,从左到右。
- 这些是Obserable发出的事件
- 此垂直线表示Observable已成功发送处事件,事件发送完毕
- these dotted lines and this box indicate that a transformation is being applied to the Observable The text inside the box shows the nature of the transformation
这些虚线和此框表示正在对Observable数据转换。
框内的文本显示转换的性质 - this Observable is the result of the transformation
对Observable转换的结果 - is for some reason the Observable terinates abnormally,with an error the vertical line is replaced by an x
由于某种原因,Observable异常中断,垂直线x代替错误。
创建Observables
-
Create
— create an Observable from scratch by calling observer methods programmatically
创建一个默认的Observable
-
Defer
— do not create the Observable until the observer subscribes, and create a fresh Observable for each observer
直到有观察者订阅时才创建Observable,并且为每个观察者创建一个新的Observable
Defer操作符会一直等待直到有观察者订阅它,然后它使用Observable工厂方法生成一个Observable。它对每个观察者都这样做,因此尽管每个订阅者都以为自己订阅的是同一个Observable,事实上每个订阅者获取的是它们自己的单独的数据序列。
-
Empty
/Never
/Throw
— create Observables that have very precise and limited behavior
Empty 创建一个不发射任何数据但是正常终止的Observable
never 创建一个不发射数据也不终止的Observable
Throw 创建一个不发射数据以一个错误终止的Observable
-
From
— convert some other object or data structure into an Observable
image.png
一对多的关系
将一个Iterable, 一个Future, 或者一个数组转换成一个Observable
产生的Observable会发射Iterable或数组的每一项数据。
-
Interval
— create an Observable that emits a sequence of integers spaced by a particular time interval
创建一个按固定时间间隔发射整数序列的Observable
-
Just
— convert an object or a set of objects into an Observable that emits that or those objects
创建一个发射指定值的Observable
Just类似于From,但是From会将数组或Iterable的数据取出然后逐个发射,而Just只是简单的原样发射,将数组或Iterable当做单个数据。
注意:如果你传递null
给Just,它会返回一个发射null
值的Observable。不要误认为它会返回一个空Observable(完全不发射任何数据的Observable),如果需要空Observable你应该使用Empty操作符。
-
Range
— create an Observable that emits a range of sequential integers
创建一个发射指定范围的整数序列的Observable
Range操作符发射一个范围内的有序整数序列,你可以指定范围的起始和长度。
RxJava将这个操作符实现为range函数,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。如果你将第二个参数设为0,将导致Observable不发射任何数据(如果设置为负数,会抛异常)。
-
Repeat
— create an Observable that emits a particular item or sequence of items repeatedly
创建一个重复发射指定数据或数据序列的Observable
RxJava将这个操作符实现为repeat方法。它不是创建一个Observable,而是重复发射原始Observable的数据序列,这个序列或者是无限的,或者通过repeat(n)指定重复次数。
-
Start
— create an Observable that emits the return value of a function
创建一个Observable,它发出类似函数的指令的返回值
-
Timer
— create an Observable that emits a single item after a given delay
image.png创建一个Observable,它在一个给定的延迟后发射一个特殊的值。
最基本使用
- 创建 被观察者
- 创建 观察者
- 订阅事件 连接 被观察者 和观察者
//创建 被观察者
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("1");
Log.d(TAG, "subscribe: 1");
emitter.onNext("2");
Log.d(TAG, "subscribe: 2");
emitter.onNext("3");
Log.d(TAG, "subscribe: 3");
emitter.onNext("4");
Log.d(TAG, "subscribe: 4");
emitter.onComplete();
}
}).subscribe(//订阅
new Observer<String>() {//观察者
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: " + d.toString());
this.disposable = d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
Log.d(TAG, "onNext: disposed 前" + disposable.isDisposed());
if (s.equals("2")) {
disposable.dispose();
}
Log.d(TAG, "onNext: disposed 后" + disposable.isDisposed());
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
在 Observer观察者 中,多了一个回调方法:onSubscribe,传递参数为Disposable,Disposable 相当于 RxJava 1.x 中的 Subscription, 用于解除订阅。当s等于2时,解除订阅,观察者不会在接收到被观察者发出的消息,也包裹 onComplete
事件。
在请求的过程中Activity已经退出了, 这个时候如果回到主线程去更新UI, 那么APP肯定就崩溃了, 怎么办呢, 上一节我们说到了Disposable , 说它是个开关, 调用它的dispose()方法时就会切断水管, 使得下游收不到事件, 既然收不到事件, 那么也就不会再去更新UI了. 因此我们可以在Activity中将这个Disposable 保存起来, 当Activity退出时, 切断它即可.
那如果有多个Disposable 该怎么办呢, RxJava中已经内置了一个容器CompositeDisposable, 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候, 调用CompositeDisposable.clear() 即可切断所有的水管.
上游可以发送无限个onNext, 下游也可以接收无限个onNext.
当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
上游可以不发送onComplete或onError.
最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然
下面是log信息。
onNext: 1
onNext: disposed 前false
onNext: disposed 后false
subscribe: 1
onNext: 2
onNext: disposed 前false
onNext: disposed 后true
subscribe: 2
subscribe: 3
subscribe: 4
form
just
输出一样
Observable<Integer> form = Observable.fromArray(items);
Observable<Integer> just = Observable.just(1, 2, 3, 4, 5);
Observable<Integer> range = Observable.range(1, 10);
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onNext: 7
onNext: 8
onNext: 9
onNext: 10
onComplete:
subscribe 观察者
public final Disposable subscribe() {}
// 表示观察者不对被观察者发送的事件作出任何响应(但被观察者还是可以继续发送事件)
public final Disposable subscribe(Consumer<? super T> onNext) {}
// 表示观察者只对被观察者发送的Next事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
// 表示观察者只对被观察者发送的Next事件 & Error事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 & Complete事件作出响应
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
// 表示观察者只对被观察者发送的Next事件、Error事件 、Complete事件 & onSubscribe事件作出响应
public final void subscribe(Observer<? super T> observer) {}
// 表示观察者对被观察者发送的任何事件都作出响应
调度器 Scheduler
-
Schedulers.computation( )
用于计算任务,如事件循环或和回调处理,不要用于IO操作(IO操作请使用Schedulers.io());默认线程数等于处理器的数量 -
Schedulers.from(executor)
使用指定的Executor作为调度器 -
Schedulers.immediate( )
在当前线程立即开始执行任务 -
Schedulers.io( )
用于IO密集型任务,如异步阻塞IO操作,这个调度器的线程池会根据需要增长;对于普通的计算任务,请使用Schedulers.computation();Schedulers.io( )默认是一个CachedThreadScheduler,很像一个有线程缓存的新线程调度器 -
Schedulers.newThread( )
为每个任务创建一个新线程 -
Schedulers.trampoline( )
当其它排队的任务完成后,在当前线程排队开始执行
简单的来说, subscribeOn() 指定的是上游发送事件的线程, observeOn() 指定的是下游接收事件的线程.
多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
多次指定下游的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.
网友评论