RXJava学习笔记(4)

作者: 皮球二二 | 来源:发表于2016-06-22 16:28 被阅读242次

    组合操作符

    组合操作符一共主要有以下几个:

    1. CombineLatest
    2. Join
    3. Merge
    4. StartWith
    5. Switch
    6. Zip

    zip

    zip操作符一般可以用在如下场景,

    1. 一个页面有多个接口,我希望等数据全部返回出来之后一并显示
    2. 用户发图片,可能有多张,我希望用户上传完图片之后一并提示
      ......
    zip

    看图说话,当我们有2个或多个Observables发射的数据项时,zip操作符将严格的按照发射的顺序去将结合这些数据项,并且最终他发射出的数据数量与发射数据项最少的那个Observable的数据数量一样多。

    zip操作符可以接受一到九个参数:一个Observable序列,或者一些发射Observable的Observables

    例子:

    我这边有三个数据源,他们分别在2s\4s\6s吐数据,然后我打算将他们整合在一起

    Observable o1=Observable.create(new Observable.OnSubscribe<Object>() {
        @Override
        public void call(Subscriber<? super Object> subscriber) {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            subscriber.onNext("first");
        }
    });
    
    Observable o2=Observable.create(new Observable.OnSubscribe<Object>() {
        @Override
        public void call(Subscriber<? super Object> subscriber) {
            try {
                Thread.sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            subscriber.onNext("second");
        }
    });
    
    Observable o3=Observable.create(new Observable.OnSubscribe<Object>() {
        @Override
        public void call(Subscriber<? super Object> subscriber) {
            try {
                Thread.sleep(6000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            subscriber.onNext("third");
        }
    }).subscribeOn(Schedulers.io());
    
    Observable.zip(o1, o2, o3, new Func3() {
        @Override
        public Object call(Object o, Object o2, Object o3) {
            Log.d("MainActivity", o.toString());
            Log.d("MainActivity", o2.toString());
            Log.d("MainActivity", o3.toString());
            return o.toString()+o2.toString()+o3.toString();
        }}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
        @Override
        public void call(Object o) {
            Log.d("MainActivity", o.toString());
        }
    });
    

    最终,这个call将会在三条数据都返回过来后再执行

    06-21 05:19:04.276 6306-6416/com.example.clevo.rxjavademo D/MainActivity: first
    06-21 05:19:04.276 6306-6416/com.example.clevo.rxjavademo D/MainActivity: second
    06-21 05:19:04.276 6306-6416/com.example.clevo.rxjavademo D/MainActivity: third
    06-21 05:19:04.276 6306-6306/com.example.clevo.rxjavademo D/MainActivity: firstsecondthird
    

    请看时间点是一致的

    如果有多对匹配,那么则顺序打印出匹配项

    Observable o4=Observable.create(new Observable.OnSubscribe<Object>() {
        @Override
        public void call(Subscriber<? super Object> subscriber) {
            for (int i=0;i<3;i++) {
                try {
                    Thread.sleep(i*3000);
                    subscriber.onNext("four");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }}).subscribeOn(Schedulers.io());
    
    Observable.zip(Observable.just("1", "2", "3", "4"), o4, new Func2() {
        @Override
        public Object call(Object o, Object o2) {
            //每结合成功一组就触发
            Log.d("MainActivity", o.toString());
            Log.d("MainActivity", o2.toString());
            return o.toString()+o2.toString();
        }}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
        @Override
        public void call(Object o) {
            Log.d("MainActivity", o.toString());
        }
    });
    

    看看结果

    06-21 05:33:11.689 19242-19264/com.example.clevo.rxjavademo D/MainActivity: 1
    06-21 05:33:11.689 19242-19264/com.example.clevo.rxjavademo D/MainActivity: four
    06-21 05:33:11.690 19242-19242/com.example.clevo.rxjavademo D/MainActivity: 1four
    06-21 05:33:17.690 19242-19264/com.example.clevo.rxjavademo D/MainActivity: 2
    06-21 05:33:17.690 19242-19264/com.example.clevo.rxjavademo D/MainActivity: four
    06-21 05:33:17.690 19242-19242/com.example.clevo.rxjavademo D/MainActivity: 2four
    06-21 05:33:26.691 19242-19264/com.example.clevo.rxjavademo D/MainActivity: 3
    06-21 05:33:26.691 19242-19264/com.example.clevo.rxjavademo D/MainActivity: four
    06-21 05:33:26.691 19242-19242/com.example.clevo.rxjavademo D/MainActivity: 3four
    

    与zip操作符功能差不多的还有zipWith操作符,他与zip操作符的区别在于zipWith操作符总是接受两个参数,第一个参数是一个Observable或者一个Iterable

    o4.zipWith(Observable.just("1", "2", "3", "4"), new Func2() {
        @Override
        public Object call(Object o, Object o2) {
            Log.d("MainActivity", o.toString());
            Log.d("MainActivity", o2.toString());
            return o.toString()+o2.toString();
        }}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
        @Override
        public void call(Object o) {
            Log.d("MainActivity", o.toString());
        }
    });
    

    结果与之前一致

    CombineLatest

    CombineLatest操作符一般可以用在如下场景

    1. 登录页面用户名密码需要同时校验,两个EditText的TextWatcher分别触发不同的Observal,以达到共同校验的效果
    CombineLatest

    看图说话,CombineLatest操作符与Zip操作符类似,但是区别很明显,ZIp操作符是每一个Observal都发射数据了,才会被结合成一个新的Observal,而CombineLatest是只要之前的Observal被发射过了,那么他会用这条Observal最后的那条数据,重新结合成一个新的Observal。这个从图中应该能够清晰的展现出来。

    我把之前的代码改动一下

    Observable.combineLatest(o1, Observable.just("1", "2", "3", "4"), new Func2() {
        @Override
        public Object call(Object o, Object o2) {
            Log.d("MainActivity", o.toString());
            Log.d("MainActivity", o2.toString());
            return o.toString()+o2.toString();
        }}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
        @Override
        public void call(Object o) {
            Log.d("MainActivity", o.toString());
        }
    });
    

    来看看结果

    06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: first
    06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: 1
    06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: first
    06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: 2
    06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: first
    06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: 3
    06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: first
    06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: 4
    06-21 20:44:32.084 2068-2068/com.example.clevo.rxjavademo D/MainActivity: first1
    06-21 20:44:32.084 2068-2068/com.example.clevo.rxjavademo D/MainActivity: first2
    06-21 20:44:32.084 2068-2068/com.example.clevo.rxjavademo D/MainActivity: first3
    06-21 20:44:32.084 2068-2068/com.example.clevo.rxjavademo D/MainActivity: first4
    

    虽然o1只发射了一条数据,但是另外一个Observal有4条数据出来,最终得到4条结果

    Join

    Join

    恕我个人愚笨,这张图我看了好久才知道是什么意思。这个跟之前所说的结合是一个很大的区别,它带上了有效期。你可以简单的理解是,我输入了手机号,然后等待验证码,验证码有效期是1分钟,如果在1分钟内你发射数据,咱俩就可以结合了,1分钟后,你照死发,我都不会理你。
    看图说话,源Observal就是第一行数据,目标Observal是第二行数据。源Observal的有效期是蓝色箭头跟黑色箭头之间部分,目标Observal有效期是黑色部分跟粉红色部分之间。所以我们看,第一个菱形过来之后,粉红色球还处于有效期内,他们结合了;土黄色球来了,菱形还在他有效期内,他们也结合了;但是青色球来了之后,菱形的有效期过了,就不能结合了。

    o1.join(o3, new Func1() {
        @Override
        public Object call(Object o) {
            return Observable.timer(2, TimeUnit.SECONDS);
        }}, new Func1() {
        @Override
        public Object call(Object o) {
            return Observable.timer(2, TimeUnit.SECONDS);
        }}, new Func2() {
        @Override
        public Object call(Object o, Object o2) {
            return o.toString()+o2.toString();
        }}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
        @Override
        public void onNext(Object o) {
            Log.d("MainActivity", o.toString());
        }
    });
    

    四个参数是什么意思,o3是目标Observal,第二个参数是源Observal的有效时间,第三个参数是目标Observal的有效时间,第四个参数是如果有合并,自行处理合并后的逻辑。这里,第二个参数的有效期只有2秒,那么算上之前的延时时间,源Observal过期2s之后目标Observal才发射数据,那么这里就不会有结合操作了

    跟他类似的还有GroupJoin

    GroupJoin

    大体上是一致的,区别在与他是等到有效期到了,才响应,而且第四个参数入参有区别,join是你对象Observal发射出来的那个值,而GroupJoin是对象Observal发射出的Observal

    o1.groupJoin(o3, new Func1() {
        @Override
        public Object call(Object o) {
            return Observable.timer(7, TimeUnit.SECONDS);
        }}, new Func1() {
        @Override
        public Object call(Object o) {
            return Observable.timer(2, TimeUnit.SECONDS);
        }}, new Func2() {
        @Override
        public Object call(final Object o, Object o2) {
            return ((Observable<String>) o2).flatMap(new Func1<String, Observable<?>>() {
                @Override
                public Observable<?> call(String s) {
                    return Observable.just(s+o.toString());
                }
            });
        }}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
        @Override
        public void onNext(Object o) {
            ((Observable) o).subscribe(new Subscriber() {
                @Override
                public void onCompleted() {
                }
                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                }
                @Override
                public void onNext(Object o) {
                    Log.d("MainActivity", o.toString());
                }
            });
        }
    });
    

    算上之前的启动信息

    06-22 02:25:39.927 730-20553/system_process I/ActivityManager: START u0 {act=android.intent.action.MAIN cat=[android.intent.category.LAUNCHER] flg=0x10200000 cmp=com.example.clevo.rxjavademo/.MainActivity (has extras)} from uid 10039 on display 0
    06-22 02:25:39.927 730-20553/system_process V/WindowManager: addAppToken: AppWindowToken{1b4d4776 token=Token{ad42a11 ActivityRecord{3912da38 u0 com.example.clevo.rxjavademo/.MainActivity t236}}} to stack=1 task=236 at 0
    06-22 02:25:39.954 730-1035/system_process V/WindowManager: Adding window Window{cb3386f u0 com.example.clevo.rxjavademo/com.example.clevo.rxjavademo.MainActivity} at 3 of 9 (before Window{12c90a49 u0 Starting com.example.clevo.rxjavademo})
    06-22 02:25:40.117 730-756/system_process I/ActivityManager: Displayed com.example.clevo.rxjavademo/.MainActivity: +186ms
    06-22 02:25:47.938 29412-30378/com.example.clevo.rxjavademo D/MainActivity: thirdfirst
    

    可以看到到了有效期结束的时候,顺利打印

    Merge

    我们在请求列表数据的时候,一般有这个需求,将本地保存的数据与网络请求到的数据合并成一条数据显示出来,这个时候,我们就需要用到Merge

    Merge

    从图中可以看出来,Merge只是单纯的把多条Observal合并成1条

    Observable.merge(o4, o3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
        @Override
        public void call(Object o) {
            Log.d("MainActivity", o.toString());
        }
    });
    

    看看结果

    06-22 02:42:18.161 13441-13441/com.example.clevo.rxjavademo D/MainActivity: four
    06-22 02:42:21.161 13441-13441/com.example.clevo.rxjavademo D/MainActivity: third
    06-22 02:42:24.162 13441-13441/com.example.clevo.rxjavademo D/MainActivity: four
    06-22 02:42:33.162 13441-13441/com.example.clevo.rxjavademo D/MainActivity: four
    

    还有一个操作符与他类似,MergeDelayError,他与Merge的区别在于,Merge在数据发射过程中如果遇到错误,会立即终止,MergeDelayError则会继续发射,直到数据发射完,才将Error传递给观察者

    Observable o5=Observable.create(new Observable.OnSubscribe<Object>() {
        @Override
        public void call(Subscriber<? super Object> subscriber) {
            for (int i=0;i<3;i++) {
                try {
                    Thread.sleep((i+1)*3000);
                    if (i==1) {
                        subscriber.onError(new Exception("ERROR"));
                    }
                    subscriber.onNext("five");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }}).subscribeOn(Schedulers.io());
    
    Observable.mergeDelayError(o5, o3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
        @Override
        public void onNext(Object o) {
            Log.d("MainActivity", o.toString());
        }
    });
    

    对比下结果

    06-22 02:52:15.416 22830-22830/com.example.clevo.rxjavademo D/MainActivity: five
    06-22 02:52:18.416 22830-22830/com.example.clevo.rxjavademo D/MainActivity: third
    06-22 02:52:21.418 22830-22830/com.example.clevo.rxjavademo D/MainActivity: five
    06-22 02:52:30.418 22830-22830/com.example.clevo.rxjavademo D/MainActivity: five
    
    06-22 02:57:21.805 27254-27254/com.example.clevo.rxjavademo D/MainActivity: five
    06-22 02:57:22.295 27254-27254/com.example.clevo.rxjavademo D/MainActivity: third
    

    StartWith

    这个其实没啥好说的,就是往既定的Observal里面直接加入一个Observal的内容

    startWith.c.png
    Observable.just("1", "2", "3").startWith("5", "6", "7").subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<String>() {
        @Override
        public void call(String s) {
            Log.d("MainActivity", s);
        }
    });
    

    看看结果

    06-22 03:36:55.080 31825-31825/? D/MainActivity: 5
    06-22 03:36:55.080 31825-31825/? D/MainActivity: 6
    06-22 03:36:55.080 31825-31825/? D/MainActivity: 7
    06-22 03:36:55.080 31825-31825/? D/MainActivity: 1
    06-22 03:36:55.080 31825-31825/? D/MainActivity: 2
    06-22 03:36:55.080 31825-31825/? D/MainActivity: 3
    

    SwitchOnNext

    SwitchOnNext

    如图,有一个Observable,他可以自己狂发射不同的Observal,这个时候,如果在同一个时间内存在两个或多个Observable提交结果,那么只取最后一个Observable提交的结果给观察者,这里面第二个Observal发射数据了,那么第一个Observal从黄球开始就被关闭了

    private Observable<String> createObserver(final int index) {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 1; i < index; i++) {
                    subscriber.onNext(index + "-" + i);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).subscribeOn(Schedulers.newThread());
    }
    

    每隔1s发一条数据

    Observable.switchOnNext(Observable.create(
            new Observable.OnSubscribe<Observable<String>>() {
                @Override
                public void call(Subscriber<? super Observable<String>> subscriber) {
                    for (int i = 1; i < 3; i++) {
                        if (i==1) {
                            subscriber.onNext(createObserver(10));
                        }
                        else if (i==2) {
                            subscriber.onNext(createObserver(5));
                        }
                        try {
                            Thread.sleep(2100);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            })).subscribe(new Subscriber<String>() {
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable e) {
        }
        @Override
        public void onNext(String s) {
            Log.d("MainActivity", s);
        }
    });
    

    这样,分析中我们可知,在过了4200s之后,第一条Observal就会被关闭

    06-22 04:25:01.155 11678-11709/com.example.clevo.rxjavademo D/MainActivity: 10-1
    06-22 04:25:02.156 11678-11709/com.example.clevo.rxjavademo D/MainActivity: 10-2
    06-22 04:25:03.157 11678-11709/com.example.clevo.rxjavademo D/MainActivity: 10-3
    06-22 04:25:03.255 11678-11744/com.example.clevo.rxjavademo D/MainActivity: 5-1
    06-22 04:25:04.259 11678-11744/com.example.clevo.rxjavademo D/MainActivity: 5-2
    06-22 04:25:05.263 11678-11744/com.example.clevo.rxjavademo D/MainActivity: 5-3
    06-22 04:25:06.263 11678-11744/com.example.clevo.rxjavademo D/MainActivity: 5-4
    

    组合操作符我们就学习到这里了,下一篇我们学一下简单的错误处理

    主要参考文章

    相关文章

      网友评论

      本文标题:RXJava学习笔记(4)

      本文链接:https://www.haomeiwen.com/subject/daegdttx.html