前言
本文学习RxJava的创造类操作符
create
创建一个Observable
代码如下:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e("onNext",integer+"");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
Log.e("onComplete","onComplete");
}
});
测试结果:
07-02 14:43:07.785 22080-22080/com.zhqy.myrxjava E/onNext: 1
07-02 14:43:07.785 22080-22080/com.zhqy.myrxjava E/onNext: 2
07-02 14:43:07.785 22080-22080/com.zhqy.myrxjava E/onNext: 3
07-02 14:43:07.785 22080-22080/com.zhqy.myrxjava E/onComplete: onComplete
just
创建一个被观察者,可以发送多个事件,但是不能超过10个 。
代码如下:
Observable.just(1,2,3,4,5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
测试结果:
07-02 14:55:05.103 29726-29726/com.zhqy.myrxjava E/accept: 1
07-02 14:55:05.103 29726-29726/com.zhqy.myrxjava E/accept: 2
07-02 14:55:05.103 29726-29726/com.zhqy.myrxjava E/accept: 3
07-02 14:55:05.103 29726-29726/com.zhqy.myrxjava E/accept: 4
07-02 14:55:05.103 29726-29726/com.zhqy.myrxjava E/accept: 5
fromArray
和just()方法类似,但是可以传入多个事件,也可以传入一个数组
代码如下:
Observable.fromArray(1,2,3,4,5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
测试结果
07-02 15:00:36.583 30268-30268/com.zhqy.myrxjava E/accept: 1
07-02 15:00:36.583 30268-30268/com.zhqy.myrxjava E/accept: 2
07-02 15:00:36.583 30268-30268/com.zhqy.myrxjava E/accept: 3
07-02 15:00:36.583 30268-30268/com.zhqy.myrxjava E/accept: 4
07-02 15:00:36.583 30268-30268/com.zhqy.myrxjava E/accept: 5
fromCallable()
将Callable的返回值发送给观察者,Callable用法也Runnable用法一致,一个是 call() 方法,一个是 run() 方法,并且 call() 有返回值。
代码如下:
Observable.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 1;
}
}).subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
测试结果
07-02 15:06:05.821 30786-30786/com.zhqy.myrxjava E/accept: 1
timer
再一定时间之后,发送一个0L的值给观察者
代码如下:
Observable.timer(3,TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("accept",aLong+"");
}
});
测试结果
07-02 15:39:02.085 6006-6049/com.zhqy.myrxjava E/accept: 0
interval
每隔一段时间发送一个事件,这个事件是从0开始,每次加1 。
代码如下:
Observable.interval(5,1,TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("accept",aLong+"");
}
});
测试结果:
07-02 15:44:26.179 6291-6470/com.zhqy.myrxjava E/accept: 0
07-02 15:44:27.178 6291-6470/com.zhqy.myrxjava E/accept: 1
07-02 15:44:28.178 6291-6470/com.zhqy.myrxjava E/accept: 2
07-02 15:44:29.179 6291-6470/com.zhqy.myrxjava E/accept: 3
intervalRange
和interval功能类似,但是可以制定开始数值以及发送事件数量
代码如下:
Observable.intervalRange(10,5,5,1,TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
Log.e("accept",aLong+"");
}
});
测试结果
07-02 15:50:21.538 6784-6840/com.zhqy.myrxjava E/accept: 10
07-02 15:50:22.537 6784-6840/com.zhqy.myrxjava E/accept: 11
07-02 15:50:23.537 6784-6840/com.zhqy.myrxjava E/accept: 12
07-02 15:50:24.537 6784-6840/com.zhqy.myrxjava E/accept: 13
07-02 15:50:25.538 6784-6840/com.zhqy.myrxjava E/accept: 14
range & rangeLong
同时发送多个事件序列
代码如下:
Observable.range(0,5)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e("accept",integer+"");
}
});
测试结果
07-02 15:55:04.883 7210-7210/com.zhqy.myrxjava E/accept: 0
07-02 15:55:04.884 7210-7210/com.zhqy.myrxjava E/accept: 1
07-02 15:55:04.884 7210-7210/com.zhqy.myrxjava E/accept: 2
07-02 15:55:04.884 7210-7210/com.zhqy.myrxjava E/accept: 3
07-02 15:55:04.884 7210-7210/com.zhqy.myrxjava E/accept: 4
empty & never & error
empty:调用之后直接发送完成事件,会调用 onComplete 方法
never:不发送事件
error:调用之后会执行 onError 方法
代码如下:
Observable.error(new Throwable("出错了"))
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
Log.e("onError",e.getMessage());
}
@Override
public void onComplete() {
}
});
测试结果
07-02 16:00:53.593 7641-7641/com.zhqy.myrxjava E/onError: 出错了
网友评论