create
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
变换操作符
map
将发送的数据应用特定的方法再次发送出去。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
例如:
Observable.just("#0").map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return "hello world" + s;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// System.out: accept: hello world#0
System.out.println("accept: " + s);
}
});
flatMap
将发送的数据应用特定的方法,转换为另一个数据源发送出去。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
例如:
Observable.just("#0", "#1", "#2", "#3").flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
long ts = 0;
if (s.equals("#2")) {
ts = 1000;
}
return Observable.just("hello world" + s).delay(ts, TimeUnit.MILLISECONDS);
}
}, false, 3).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// System.out: accept: hello world#0
// System.out: accept: hello world#1
// System.out: accept: hello world#3
// System.out: accept: hello world#2
System.out.println("accept: " + s);
}
});
concatMap
作用类似于flatMap,只不过concatMap是有序的。
public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
例如:
Observable.just("#0", "#1", "#2", "#3").concatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String s) throws Exception {
long ts = 0;
if (s.equals("#2")) {
ts = 1000;
}
return Observable.just("hello world" + s).delay(ts, TimeUnit.MILLISECONDS);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// System.out: accept: hello world#0
// System.out: accept: hello world#1
// System.out: accept: hello world#2
// System.out: accept: hello world#3
System.out.println("accept: " + s);
}
});
zip
合并多个数据源
public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)
public static <T, R> Observable<R> zip(ObservableSource<? extends ObservableSource<? extends T>> sources, final Function<? super Object[], ? extends R> zipper)
public static <T1, T2, R> Observable<R> zip(
ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
BiFunction<? super T1, ? super T2, ? extends R> zipper)
// 最多有9个参数的数据源
例如:
Observable.zip(Observable.just("#1"), Observable.just("#2"), new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s + "-" + s2;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// System.out: accept: #1-#2
System.out.println("accept: " + s);
}
});
amb/ambArray
amb
从一个ObservableSources迭代器中竞争出一个ObservableSources,哪个Observable首先发射了数据(包括onError和onComplete)就会继续发射这个Observable的数据,其他的Observable所发射的数据都会别丢弃。
public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
ambArray
从一个ObservableSources数组竞争出一个ObservableSources,哪个Observable首先发射了数据(包括onError和onComplete)就会继续发射这个Observable的数据,其他的Observable所发射的数据都会别丢弃。
public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources)
例如:
// 1
Observable.ambArray(
Observable.just("#1").delay(1000, TimeUnit.MILLISECONDS),
Observable.just("#2"))
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// System.out: accept: #2
System.out.println("accept: " + s);
}
});
// 2
Observable.amb(new Iterable<ObservableSource<String>>() {
@NonNull
@Override
public Iterator<ObservableSource<String>> iterator() {
List<ObservableSource<String>> list = new ArrayList<>();
for (int i = 0; i < 5; i++) {
final int finalI = i;
list.add(new Observable<String>() {
@Override
protected void subscribeActual(Observer<? super String> observer) {
observer.onNext("#" + finalI);
}
});
}
return list.iterator();
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
// System.out: accept: #0
System.out.println("accept: " + s);
}
});
网友评论