引入
要在Android中使用RxJava2, 先添加Gradle配置:
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
Observable 创建发送消息
Observable mobservable= Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
}
})
Emitter:是一个发送者,它发送onNext,onComplete,onErr,三种事件
需要满足一定的规则:
- Emitter可以发送无限个onNext, observer 也可以接收无限个onNext.
- 当上Emitter发送了一个onComplete后, onComplete之后的事件将会继续发送, 而Observe收到onComplete事件之后将不再继续接收事件.
- 当Emitter发送了一个onError后, onError之后的事件将继续发送, 而observer 收到onError事件之后将不再继续接收事件
- Emitter可以不发送onComplete或onError
- 最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然
Observer 创建消息接收者
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "subscribe");
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "" + value);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "error");
}
@Override
public void onComplete() {
Log.d(TAG, "complete");
}
};
- onNext、onComplete、onComplete 分别接受发送来的事件
- onSubscribe 建立连接时候调用 ,Disposable 可以切断连接,obsever不再接送发来的事件
建立联系
mobservable.subscribe(observer);`
- Observable
创建一个observable (常用):
public static <T> Observable<T> create(ObservableOnSubscribe<T> source);
public static <T> Observable<T> just(T item)
public static <T> Observable<T> fromArray(T... items)
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit)
//等
- Create,通过调用观察者的方法从头创建一个Observable 上面已经有列子;
- Just,创建一个发射指定值的Observable,下面例子就是指定发送(1,2,3)
Observable.just(1, 2, 3).
subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
-
Interval,创建一个按固定时间间隔发射整数序列的Observable,就相当于我们定时器
-
Range,创建发射指定范围的整数序列的Observable,在实际开发中可以拿来替代for循环,发射一个范围内的有序整数序列,第一个参数是起始值必须不小于0,第二个参数为终值,左闭右开。
-
From,将其它的对象或数据结构转换为Observable,将相同类型的数据放到Future、Iterable或者数组里。
Integer[] items = { 1, 2, 3 };
Observable.fromArray(items).
subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
订阅方法
public final Disposable subscribe() {}
public final Disposable subscribe(Consumer<? super T> onNext) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
public final void subscribe(Observer<? super T> observer) {}
- 带有一个Consumer参数的方法表示下游只关心onNext事件
- subscribe()什么都不管,observable只管发你的
- 带有Consumer参数的几个重载方法是根据 ,根据继承参数不一样,关系的事件也不一样,如Consumer<? super Throwable> onError 关心错误事件;Action onComplete接收完成事件
网友评论