美文网首页
RxJava操作符

RxJava操作符

作者: Charein | 来源:发表于2019-02-12 17:15 被阅读0次

    create

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
    

    变换操作符

    map将发送的数据应用特定的方法再次发送出去。

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
    

    例如:

    Observable.just("#0").map(new Function<String, String>() {
                @Override
                public String apply(String s) throws Exception {
                    return "hello world" + s;
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    // System.out: accept: hello world#0
                    System.out.println("accept: " + s);
                }
            });
    

    flatMap将发送的数据应用特定的方法,转换为另一个数据源发送出去。

    public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
    

    例如:

    Observable.just("#0", "#1", "#2", "#3").flatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String s) throws Exception {
                    long ts = 0;
                    if (s.equals("#2")) {
                        ts = 1000;
                    }
                    return Observable.just("hello world" + s).delay(ts, TimeUnit.MILLISECONDS);
                }
            }, false, 3).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    // System.out: accept: hello world#0
                    // System.out: accept: hello world#1
                    // System.out: accept: hello world#3
                    // System.out: accept: hello world#2
                    System.out.println("accept: " + s);
                }
            });
    

    concatMap 作用类似于flatMap,只不过concatMap是有序的。

    public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
    

    例如:

    Observable.just("#0", "#1", "#2", "#3").concatMap(new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String s) throws Exception {
                    long ts = 0;
                    if (s.equals("#2")) {
                        ts = 1000;
                    }
                    return Observable.just("hello world" + s).delay(ts, TimeUnit.MILLISECONDS);
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    // System.out: accept: hello world#0
                    // System.out: accept: hello world#1
                    // System.out: accept: hello world#2
                    // System.out: accept: hello world#3
                    System.out.println("accept: " + s);
                }
            });
    

    zip 合并多个数据源

    public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)
    public static <T, R> Observable<R> zip(ObservableSource<? extends ObservableSource<? extends T>> sources, final Function<? super Object[], ? extends R> zipper)
    public static <T1, T2, R> Observable<R> zip(
                ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2,
                BiFunction<? super T1, ? super T2, ? extends R> zipper)
    // 最多有9个参数的数据源
    

    例如:

    Observable.zip(Observable.just("#1"), Observable.just("#2"), new BiFunction<String, String, String>() {
                @Override
                public String apply(String s, String s2) throws Exception {
                    return s + "-" + s2;
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    // System.out: accept: #1-#2
                    System.out.println("accept: " + s);
                }
            });
    

    amb/ambArray

    amb 从一个ObservableSources迭代器中竞争出一个ObservableSources,哪个Observable首先发射了数据(包括onError和onComplete)就会继续发射这个Observable的数据,其他的Observable所发射的数据都会别丢弃。

    public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources) 
    

    ambArray 从一个ObservableSources数组竞争出一个ObservableSources,哪个Observable首先发射了数据(包括onError和onComplete)就会继续发射这个Observable的数据,其他的Observable所发射的数据都会别丢弃。

    public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources)
    

    例如:

            // 1
            Observable.ambArray(
                    Observable.just("#1").delay(1000, TimeUnit.MILLISECONDS),
                    Observable.just("#2"))
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            // System.out: accept: #2
                            System.out.println("accept: " + s);
                        }
                    });
    
            // 2
            Observable.amb(new Iterable<ObservableSource<String>>() {
                @NonNull
                @Override
                public Iterator<ObservableSource<String>> iterator() {
                    List<ObservableSource<String>> list = new ArrayList<>();
                    for (int i = 0; i < 5; i++) {
                        final int finalI = i;
                        list.add(new Observable<String>() {
                            @Override
                            protected void subscribeActual(Observer<? super String> observer) {
                                observer.onNext("#" + finalI);
                            }
                        });
                    }
                    return list.iterator();
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    // System.out: accept: #0
                    System.out.println("accept: " + s);
                }
            });
    

    相关文章

      网友评论

          本文标题:RxJava操作符

          本文链接:https://www.haomeiwen.com/subject/psyheqtx.html