美文网首页
RxAndroid常用操作符

RxAndroid常用操作符

作者: Sean1225 | 来源:发表于2019-03-18 18:00 被阅读0次

    RxAndroid入门一文中,我们可以知道,RxJava主要构建了一个主要应用于异步场景、通过观察者模式及使用响应式编程和函数式编程规范实现的生产者-消费者模型:

    1. 事件生产:生产者是ObservableObservable对象的创建除了最基础的create方法之外,还可以使用justfromzip等其他多个方法。RxJava中提供了大量创建Observable的工厂方法,按需取用。
    2. 事件消费:消费者是Observer及其拓展,使用subcribe关联生产者。
    3. 事件加工:这是我们在RxAndroid入门一文中没有提到的,RxJava提供了大量的方法(如mapfilter)对事件进行加工。

    事件生产和加工的方法一般也被称为操作符,RxJava提供的大量操作符不可能全部记住,因此只需要记住它们的作用,具体应用时再查阅代码或相关资料。接着,我们来看下一些常见的操作符。

    创建Observable

    create

    方法原型如下:

    static <T> Observable<T> create(ObservableOnSubscribe<T> source);
    

    这个我们在入门时已经使用过了,它是创建Observable对象最基础的方法,需要实现ObservableOnSubscribe接口,然后在ObservableOnSubscribe.subscribe中生产和交付事件。

    just

    justcreate的简化操作,它直接将需要由Emitter发射出去的数据在初始化时传入,有Observer订阅后,Observable会自动依次发射这些数据并最后调用onComplete。示例如下:

    Observable.just(1, 2, 3).subscribe(
            (Integer i) -> log("onNext:" + i),
            (Throwable e) -> e.printStackTrace(),
            () -> log("onComplete"));
    

    输出日志如下:

    onNext:1
    onNext:2
    onNext:3
    onComplete
    

    amb

    方法原型如下:

    static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources);
    static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources);
    

    所有Observable中,只有最早生产出事件的那个Observable的事件能够被Observer消费。即消费者选定最优的生产者,抛弃其他生产者,判定条件是事件的生产顺序。

    Observable.ambArray(
            Observable.create((ObservableEmitter<String> emitter) -> emitter.onNext("1")).delay(1, TimeUnit.SECONDS),
            Observable.create((ObservableEmitter<String> emitter) -> emitter.onNext("2"))
    ).subscribe((String s) -> log("onNext:" + s));
    

    上述例子中的日志输出结果是onNext:2,如果删除第一个Observabledelay调用,那么输出就是onNext:1。此外,还有一个成员方法abmWith可以在创建出Observable之后再使用,用来动态添加source。

    使用场景:

    同时向多个ip发起请求,最快响应的那个服务器将作为后续访问的节点。

    concat

    方法原型如下(方法有点多,参数数量不一样但功能完全一样的只列出其中一个):

    static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources);
    static <T> Observable<T> concat(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch);
    static <T> Observable<T> concat(ObservableSource<? extends T> source1, ObservableSource<? extends T> source2, ObservableSource<? extends T> source3, ObservableSource<? extends T> source4)
    static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources);
    static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources);
    static <T> Observable<T> concatArrayEager(int maxConcurrency, int prefetch, ObservableSource<? extends T>... sources);
    static <T> Observable<T> concatArrayEagerDelayError(int maxConcurrency, int prefetch, ObservableSource<? extends T>... sources);
    static <T> Observable<T> concatDelayError(ObservableSource<? extends ObservableSource<? extends T>> sources, int prefetch, boolean tillTheEnd);
    static <T> Observable<T> concatEager(Iterable<? extends ObservableSource<? extends T>> sources, int maxConcurrency, int prefetch);
    

    使用concat连接的所有Observable会串行执行,当上一个Observable执行结束后,即触发了onCompleteonError(如果可能触发onError记得设置异常处理器)后,下一个Observable才开始执行(是否立即执行取决于是否设置了delay)。还记得入门时讲解响应式编程中提到的例子吗(不记得的话再翻一下),用RxJava实现如下:

    Observable.concat(
            Observable.create((ObservableEmitter<Runnable> emitter) -> {
                emitter.onNext(new TaskA());
                emitter.onComplete();
            }),
            Observable.create((ObservableEmitter<Runnable> emitter) -> {
                emitter.onNext(new TaskB());
                emitter.onComplete();
            }).delay(1, TimeUnit.SECONDS),
            Observable.create((ObservableEmitter<Runnable> emitter) -> {
                emitter.onNext(new TaskC());
                emitter.onComplete();
            }).delay(1, TimeUnit.SECONDS)
    ).subscribe((Runnable runnable) -> runnable.run(), (Throwable e) -> e.printStackTrace());
    

    应用场景:
    concat的应用场景非常广,实际项目中,任务(或业务)之间的线性依赖关系很普遍。

    merge

    mergeconcat用法类似(方法原型这里就不贴了),差别在于concat是串行的,而merge是并行的。所有Observable并行运行,直到它们全部触发onComplete后,Observer才会触发onComplete,示例如下:

    Observable.merge(
            Observable.create((ObservableEmitter<String> emitter) -> {
                emitter.onNext("A");
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                }
                emitter.onNext("B");
                emitter.onComplete();
            }).subscribeOn(Schedulers.newThread()),
            Observable.create((ObservableEmitter<String> emitter) -> {
                try {
                    Thread.sleep(500);
                } catch (Exception e) {
                }
                emitter.onNext("5");
                emitter.onComplete();
            }).subscribeOn(Schedulers.newThread())
    ).subscribe(
            (String s) -> log("onNext:" + s),
            (Throwable e) -> e.printStackTrace(),
            () -> log("onComplete"));
    

    日志输出如下:

    onNext:A
    onNext:5
    onNext:B
    onComplete
    

    从日志中可以看到2个Observable是并行运行的(需要手动设置不同的调度线程),并且Observer只回调了一次onComplete,且是在2个Observable都触发了onComplete之后回调。

    使用场景:

    mergeconcat应该是使用场景最广泛的两种操作了。当一个任务(假设为C)依赖于其它多个任务时(假设为A、B),而A、B之间又没有相互依赖关系,为了保证效率,A、B显然需要并行运行,等到A、B都运行结束了就运行C。

    zip

    方法原型如下(方法有点多,参数数量不一样但功能完全一样的只列出其中一个):

    static <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> Observable<R> zip(
                ObservableSource<? extends T1> source1, ObservableSource<? extends T2> source2, ObservableSource<? extends T3> source3,
                ObservableSource<? extends T4> source4, ObservableSource<? extends T5> source5, ObservableSource<? extends T6> source6,
                ObservableSource<? extends T7> source7, ObservableSource<? extends T8> source8, ObservableSource<? extends T9> source9,
                Function9<? super T1, ? super T2, ? super T3, ? super T4, ? super T5, ? super T6, ? super T7, ? super T8, ? super T9, ? extends R> zipper);
    static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper,
                boolean delayError, int bufferSize, ObservableSource<? extends T>... sources);
    static <T, R> Observable<R> zipIterable(Iterable<? extends ObservableSource<? extends T>> sources,
                Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize);     
    

    zip将多个Observable的事件组合在一起,然后再依次发射。这话不是很容易理解,我们先来看下例子:

    Observable.zip(
            Observable.create((ObservableEmitter<String> emitter) -> {
                emitter.onNext("A");
                emitter.onNext("B");
                emitter.onNext("C");
            }),
            Observable.create((ObservableEmitter<Integer> emitter) -> {
                emitter.onNext(1);
                emitter.onNext(2);
            }),
            (String t1, Integer t2) -> new Pair<String, Integer>(t1, t2)
    ).subscribe((Pair<String, Integer> result) -> log("onNext:" + result.first + result.second));
    

    日志输出如下:

    onNext:A1
    onNext:B2
    

    这里我们使用的是3个参数的zip方法,前面2个参数传入Observable实例,以下简称O1和O2,第3个参数传入BiFunction实例。BiFunction只有一个apply方法,apply方法拦截O1、O2的事件发射,参数1表示O1发射的事件,参数2表示O2发射的事件,返回值是两个事件的打包(打包类型自已定),apply方法的作用就是进行事件打包。打包后的事件会依次发射给Observer,且打包必须依次一一对应,如果一方发射了m个事件,另一个方只发射了n个事件(n<m),那么最终发射给Observer的事件将只有n个。

    事件加工

    上面列举了创建Observable的几种方式,事实上远不止以上几种,但以上几种是比较常见,接下来我们来看下常见的事件加工。

    filter

    filter是事件加工中最简易懂的操作了,它拦截Observable发射出来的事件,并将其中不符合要求的事件滤掉,只发射满足要求的事件给Observer。看个例子:

    Observable.create((ObservableEmitter<Integer> emitter) -> {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
    }).filter((Integer i) -> i > 1)
            .subscribe((Integer i) -> log("onNext:" + i));
    

    日志输出如下:

    onNext:2
    onNext:3
    

    map

    map拦截Observable发射出来的事件,将事件转化为其他数据后再发射给Observer,示例如下:

    Observable.create((ObservableEmitter<Integer> emitter) -> {
        emitter.onNext(1);
        emitter.onNext(2);
    }).map((Integer i) -> "this is " + i)
            .subscribe((String s) -> log("onNext:" + s));
    

    日志输出如下:

    onNext:this is 1
    onNext:this is 2
    

    flatMap

    flatMapmap类似,都是进行数据的转化,差别在于flatMap将数据转化为Observable对象,这些Observable取代原有的Observable作为生产者提供数据,使用map中的示例修改后代码如下:

    Observable.just(1, 2)
            .flatMap((Integer i) -> Observable.just("this is " + i))
            .subscribe((String s) -> log("onNext:" + s));
    

    日志输出如下:

    onNext:this is 1
    onNext:this is 2
    

    仅通过上面的例子很难了解flatMap的用处,因为map也可以做到,其实flatMap适合处理更为复杂的数据,如多重列表。假设有一群小朋友聚在一起玩耍,每个人都需要拿出自己的玩具,然后我们需要统计下这些玩具,代码如下:

    class Toy {
        String name;
        Toy(String name) {
            this.name = name;
        }
    }
    
    class Kid {
        List<Toy> toys;
    }
    
    List<Kid> kids() {
        List<Kid> kids = new LinkedList<>();
        Kid kid = new Kid();
        kid.toys = new LinkedList<>();
        kid.toys.add(new Toy("熊大"));
        kid.toys.add(new Toy("熊二"));
        kids.add(kid);
        kid = new Kid();
        kid.toys = new LinkedList<>();
        kid.toys.add(new Toy("水枪"));
        kids.add(kid);
        kid = new Kid();
        kid.toys = new LinkedList<>();
        kid.toys.add(new Toy("足球"));
        kid.toys.add(new Toy("扭扭车"));
        kids.add(kid);
        return kids;
    }
    
    void test() {
        Observable.fromIterable(kids())
                .flatMap((Kid kid) -> Observable.fromIterable(kid.toys))
                .subscribe((Toy toy) -> log("onNext:" + toy.name));
    }
    

    执行test方法输出日志如下:

    onNext:熊大
    onNext:熊二
    onNext:水枪
    onNext:足球
    onNext:扭扭车
    

    concatMap

    concatMapflatMap的作用类似,差别在于前者输出的数据顺序与原始数据保持一致(使用concat),而后者不保证(使用merge),我们来看个例子:

    Observable<Integer> o1 = Observable.just(1, 2, 3, 4, 5)
            .flatMap((Integer i) -> {
                if(i == 3) {
                    return Observable.just(i).delay(1, TimeUnit.SECONDS);
                } else {
                    return Observable.just(i);
                }
            });
    Observable<Integer> o2 = Observable.just(-1, -2, -3, -4, -5)
            .concatMap((Integer i) -> {
                if(i == -3) {
                    return Observable.just(i).delay(1, TimeUnit.SECONDS);
                } else {
                    return Observable.just(i);
                }
            });
    Observable.concat(o1, o2).subscribe((Integer i) -> log("onNext:" + i));
    

    日志输出如下:

    onNext:1
    onNext:2
    onNext:4
    onNext:5
    onNext:3
    onNext:-1
    onNext:-2
    onNext:-3
    onNext:-4
    onNext:-5
    

    相关文章

      网友评论

          本文标题:RxAndroid常用操作符

          本文链接:https://www.haomeiwen.com/subject/pembmqtx.html