美文网首页
RxJava操作符

RxJava操作符

作者: Charein | 来源:发表于2019-02-12 17:15 被阅读0次

create

public static <T> Observable<T> create(ObservableOnSubscribe<T> source)

变换操作符

map将发送的数据应用特定的方法再次发送出去。

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)

例如:

Observable.just("#0").map(new Function<String, String>() {
            @Override
            public String apply(String s) throws Exception {
                return "hello world" + s;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                // System.out: accept: hello world#0
                System.out.println("accept: " + s);
            }
        });

flatMap将发送的数据应用特定的方法,转换为另一个数据源发送出去。

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

例如:

Observable.just("#0", "#1", "#2", "#3").flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) throws Exception {
                long ts = 0;
                if (s.equals("#2")) {
                    ts = 1000;
                }
                return Observable.just("hello world" + s).delay(ts, TimeUnit.MILLISECONDS);
            }
        }, false, 3).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                // System.out: accept: hello world#0
                // System.out: accept: hello world#1
                // System.out: accept: hello world#3
                // System.out: accept: hello world#2
                System.out.println("accept: " + s);
            }
        });

concatMap 作用类似于flatMap,只不过concatMap是有序的。

public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)

例如:

Observable.just("#0", "#1", "#2", "#3").concatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) throws Exception {
                long ts = 0;
                if (s.equals("#2")) {
                    ts = 1000;
                }
                return Observable.just("hello world" + s).delay(ts, TimeUnit.MILLISECONDS);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                // System.out: accept: hello world#0
                // System.out: accept: hello world#1
                // System.out: accept: hello world#2
                // System.out: accept: hello world#3
                System.out.println("accept: " + s);
            }
        });

zip 合并多个数据源

public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)
public static <T, R> Observable<R> zip(ObservableSource<? extends ObservableSource<? extends T>> sources, final Function<? super Object[], ? extends R> zipper)
public static <T1, T2, R> Observable<R> zip(
            ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
            BiFunction<? super T1, ? super T2, ? extends R> zipper)
// 最多有9个参数的数据源

例如:

Observable.zip(Observable.just("#1"), Observable.just("#2"), new BiFunction<String, String, String>() {
            @Override
            public String apply(String s, String s2) throws Exception {
                return s + "-" + s2;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                // System.out: accept: #1-#2
                System.out.println("accept: " + s);
            }
        });

amb/ambArray

amb 从一个ObservableSources迭代器中竞争出一个ObservableSources,哪个Observable首先发射了数据(包括onError和onComplete)就会继续发射这个Observable的数据,其他的Observable所发射的数据都会别丢弃。

public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources) 

ambArray 从一个ObservableSources数组竞争出一个ObservableSources,哪个Observable首先发射了数据(包括onError和onComplete)就会继续发射这个Observable的数据,其他的Observable所发射的数据都会别丢弃。

public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources)

例如:

        // 1
        Observable.ambArray(
                Observable.just("#1").delay(1000, TimeUnit.MILLISECONDS),
                Observable.just("#2"))
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        // System.out: accept: #2
                        System.out.println("accept: " + s);
                    }
                });

        // 2
        Observable.amb(new Iterable<ObservableSource<String>>() {
            @NonNull
            @Override
            public Iterator<ObservableSource<String>> iterator() {
                List<ObservableSource<String>> list = new ArrayList<>();
                for (int i = 0; i < 5; i++) {
                    final int finalI = i;
                    list.add(new Observable<String>() {
                        @Override
                        protected void subscribeActual(Observer<? super String> observer) {
                            observer.onNext("#" + finalI);
                        }
                    });
                }
                return list.iterator();
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                // System.out: accept: #0
                System.out.println("accept: " + s);
            }
        });

相关文章

网友评论

      本文标题:RxJava操作符

      本文链接:https://www.haomeiwen.com/subject/psyheqtx.html