1.Create 创建操作
create( ) — 使用一个函数从头创建一个 Observable。
defer( ) — 只有当订阅者订阅才创建 Observable;为每个订阅创建一个新的 Observable。
empty( ) — 创建一个什么都不做直接通知完成的 Observable。
error( ) — 创建一个什么都不做直接通知错误的 Observable。
from( ) — 将一个 Iterable, 一个 Future, 或者一个数组转换成一个 Observable。
interval( ) — 创建一个按照给定的时间间隔发射整数序列的 Observable。
just( ) — 将一个或多个对象转换成发射这个或这些对象的一个 Observable。
range( ) — 创建一个发射指定范围的整数序列的 Observable。
repeat( ) — 创建一个重复发射指定数据或数据序列的 Observable。
repeatWhen( ) — 创建一个重复发射指定数据或数据序列的 Observable,它依赖于另一个 Observable 发射的数据。
never( ) — 创建一个不发射任何数据的 Observable。
timer( ) — 创建一个在给定的延时之后发射单个数据的 Observable。
1.1 create
使用一个函数从头创建一个 Observable。
rxjava_create
示例代码:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
for (int i = 1; i < 5; i++) {
emitter.onNext(i+"");
}
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
System.out.println("Next: " + s);
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("Sequence complete.");
}
输出:
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.
其他
ObservableOnSubscribe, ObservableEmitter, Cancellable
1.2 defer
只有当订阅者订阅才创建 Observable;才会为每个订阅创建一个新的 Observable。
defer
示例代码:
Observable<String> observable = Observable.defer(new Callable<ObservableSource<? extends String>>() {
@Override
public ObservableSource<? extends String> call() throws Exception {
return Observable.just("String");
}
});
observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
});
输出:
String
1.3 empty
创建一个什么都不做直接通知完成的 Observable。
empty
示例代码:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
for (int i = 1; i < 5; i++) {
emitter.onNext(i+"");
}
emitter.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
System.out.println("Next: " + s);
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("Sequence complete.");
}
});
输出结果:
onComplete
1.3 error
创建一个什么都不做直接通知错误的 Observable。
throw
示例代码:
Observable<String> observable = Observable.error(new Callable<Throwable>() {
@Override
public Throwable call() throws Exception {
return new NullPointerException();
}
});
observable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
System.out.println("Next: " + s);
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出结果:
Error: null
1.4 from()
将一个 Iterable, 一个 Future, 或者一个数组转换成一个 Observable。
from
示例代码1:
//1.遍历集合
List<String> items = new ArrayList<>();
for (int i = 0; i < 3; i++) {
items.add(i + "");
}
Observable<String> observable = Observable.fromIterable(items);
//Observable<String> observable = Observable.fromArray(new String[]{"Hello", "world"});
observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("result:" + s);
}
});
输出结果:
result:0
result:1
result:2
1.5 interval()
创建一个按照给定的时间间隔发射整数序列的 Observable。
interval
示例代码:
final CompositeDisposable disposable = new CompositeDisposable();
disposable.add(Observable.interval(1, TimeUnit.SECONDS).subscribeWith(new DisposableObserver<Long>() {
@Override
public void onNext(@NonNull Long aLong) {
System.out.println("Next: " + aLong);
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
}));
//5秒后取消订阅
try {
Thread.sleep(4000);
//取消订阅
disposable.dispose();
} catch (InterruptedException e) {
e.printStackTrace();
}
输出结果:
Next: 0
Next: 1
Next: 2
Next: 3
1.6 just()
将一个或多个对象转换成发射这个或这些对象的一个 Observable。
just
示例代码:
Observable.just(1, 2, 3).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println("Next: " + s);
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出结果:
Next: 1
Next: 2
Next: 3
onComplete
1.7 range()
创建一个发射指定范围的整数序列的 Observable。
RxJava将这个操作符实现为 range 函数,它接受两个参数,一个是范围的起始值,一个是范围的数据的数目。如果你将第二个参数设为 0,将导致 Observable 不发射任何数据(如果设置为负数,会抛异常)
range
示例代码:
// 依次发射 10、11、12
Observable.range(10, 2).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer s) {
System.out.println("Next: " + s);
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出结果:
Next: 10
Next: 11
Next: 12
onComplete
1.8 repeat()
创建一个重复发射指定数据或数据序列的 Observable。
repeat
示例代码:
//重复三次,repeat()就是无限次
Observable.just("hello", "world").repeat(3).subscribe(new Observer<Object>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Object o) {
System.out.println("onNext:" + o.toString());
}
@Override
public void onError(@NonNull Throwable error) {
System.err.println("Error: " + error.getMessage());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});
输出结果:
onNext:hello
onNext:world
onNext:hello
onNext:world
onNext:hello
onNext:world
onComplete
1.9 repeatWhen()
创建一个重复发射指定数据或数据序列的 Observable,它依赖于另一个 Observable 发射的数据。
1.10 never()
创建一个不发射任何数据的 Observable。
never
(ps:不太懂有何意义)
1.11 timer()
创建一个在给定的延时之后发射单个数据的 Observable。
在 RxJava 1.0.0 及其之后的版本,官方已不再提倡使用.timer() 操作符,因为.interval() 具有同样的功能。
timer
示例代码:
Observable.timer(1, TimeUnit.SECONDS)
.subscribe(new Consumer<Long>() {
@Override
public void accept(@NonNull Long aLong) throws Exception {
System.out.println("result:" + aLong);
}
});
输出结果:
result:0
网友评论