给初学者的RxJava2.0教程(四)

作者: Season_zlc | 来源:发表于2016-12-09 16:35 被阅读33573次

Outline

[TOC]

前言

在上一节中, 我们提到了FlowableBackpressure背压, 本来这一节的确是想讲这两个东西的,可是写到一半感觉还是差点火候,感觉时机未到, 因此,这里先来做个准备工作, 先带大家学习zip这个操作符, 这个操作符也是比较牛逼的东西了, 涉及到的东西也比较多, 主要是一些细节上的东西太多, 通过学习这个操作符,可以为我们下一节的Backpressure 做个铺垫.

正题

照惯例我们还是先贴上一下比较正式的解释吧.

Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。

我们再用通俗易懂的图片来解释一下:

zip.png

从这个图中可以看见, 这次上游和以往不同的是, 我们有两根水管了.

其中一根水管负责发送圆形事件 , 另外一根水管负责发送三角形事件 , 通过Zip操作符, 使得圆形事件三角形事件 合并为了一个矩形事件 .

下面我们再来看看分解动作:

zip1.png

通过分解动作我们可以看出:

  • 组合的过程是分别从 两根水管里各取出一个事件 来进行组合, 并且一个事件只能被使用一次, 组合的顺序是严格按照事件发送的顺利 来进行的, 也就是说不会出现圆形1 事件和三角形B 事件进行合并, 也不可能出现圆形2三角形A 进行合并的情况.
  • 最终下游收到的事件数量 是和上游中发送事件最少的那一根水管的事件数量 相同. 这个也很好理解, 因为是从每一根水管 里取一个事件来进行合并, 最少的 那个肯定就最先取完 , 这个时候其他的水管尽管还有事件 , 但是已经没有足够的事件来组合了, 因此下游就不会收到剩余的事件了.

分析了大概的原理, 我们还是劳逸结合, 先来看看实际中的代码怎么写吧:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {            
    @Override                                                                                         
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                      
        Log.d(TAG, "emit 1");                                                                         
        emitter.onNext(1);                                                                            
        Log.d(TAG, "emit 2");                                                                         
        emitter.onNext(2);                                                                            
        Log.d(TAG, "emit 3");                                                                         
        emitter.onNext(3);                                                                            
        Log.d(TAG, "emit 4");                                                                         
        emitter.onNext(4);                                                                            
        Log.d(TAG, "emit complete1");                                                                 
        emitter.onComplete();                                                                         
    }                                                                                                 
});                                                                   
                                                                                                      
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {              
    @Override                                                                                         
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {                       
        Log.d(TAG, "emit A");                                                                         
        emitter.onNext("A");                                                                          
        Log.d(TAG, "emit B");                                                                         
        emitter.onNext("B");                                                                          
        Log.d(TAG, "emit C");                                                                         
        emitter.onNext("C");                                                                          
        Log.d(TAG, "emit complete2");                                                                 
        emitter.onComplete();                                                                         
    }                                                                                                 
});                                                                     
                                                                                                      
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {                  
    @Override                                                                                         
    public String apply(Integer integer, String s) throws Exception {                                 
        return integer + s;                                                                           
    }                                                                                                 
}).subscribe(new Observer<String>() {                       
    @Override                                                                                         
    public void onSubscribe(Disposable d) {                                                           
        Log.d(TAG, "onSubscribe");                                                                    
    }                                                                                                 
                                                                                                      
    @Override                                                                                         
    public void onNext(String value) {                                                                
        Log.d(TAG, "onNext: " + value);                                                               
    }                                                                                                 
                                                                                                      
    @Override                                                                                         
    public void onError(Throwable e) {                                                                
        Log.d(TAG, "onError");                                                                        
    }                                                                                                 
                                                                                                      
    @Override                                                                                         
    public void onComplete() {                                                                        
        Log.d(TAG, "onComplete");                                                                     
    }                                                                                                 
});                                                                                                   

我们分别创建了两个上游水管, 一个发送1,2,3,4,Complete, 另一个发送A,B,C,Complete, 接着用Zip把发出的事件组合, 来看看运行结果吧:

D/TAG: onSubscribe     
D/TAG: emit 1          
D/TAG: emit 2          
D/TAG: emit 3          
D/TAG: emit 4          
D/TAG: emit complete1  
D/TAG: emit A          
D/TAG: onNext: 1A      
D/TAG: emit B          
D/TAG: onNext: 2B      
D/TAG: emit C          
D/TAG: onNext: 3C      
D/TAG: emit complete2  
D/TAG: onComplete      

结果似乎是对的... 但是总感觉什么地方不对劲...

哪儿不对劲呢, 为什么感觉是水管一发送完了之后, 水管二才开始发送啊? 到底是不是呢, 我们来验证一下:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {           
    @Override                                                                                        
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                     
        Log.d(TAG, "emit 1");                                                                        
        emitter.onNext(1);                                                                           
        Thread.sleep(1000);                                                                          

        Log.d(TAG, "emit 2");                                                                        
        emitter.onNext(2);                                                                           
        Thread.sleep(1000);                                                                          

        Log.d(TAG, "emit 3");                                                                        
        emitter.onNext(3);                                                                           
        Thread.sleep(1000);                                                                          

        Log.d(TAG, "emit 4");                                                                        
        emitter.onNext(4);                                                                           
        Thread.sleep(1000);                                                                          

        Log.d(TAG, "emit complete1");                                                                
        emitter.onComplete();                                                                        
    }                                                                                                
});                                                                                                  

Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {             
    @Override                                                                                        
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {                      
        Log.d(TAG, "emit A");                                                                        
        emitter.onNext("A");                                                                         
        Thread.sleep(1000);                                                                          
                                                                                                     
        Log.d(TAG, "emit B");                                                                        
        emitter.onNext("B");                                                                         
        Thread.sleep(1000);                                                                          
                                                                                                     
        Log.d(TAG, "emit C");                                                                        
        emitter.onNext("C");                                                                         
        Thread.sleep(1000);                                                                          
                                                                                                     
        Log.d(TAG, "emit complete2");                                                                
        emitter.onComplete();                                                                        
    }                                                                                                
});                                                                                                  

Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {                 
    @Override                                                                                        
    public String apply(Integer integer, String s) throws Exception {                                
        return integer + s;                                                                          
    }                                                                                                
}).subscribe(new Observer<String>() {                                                                
    @Override                                                                                        
    public void onSubscribe(Disposable d) {                                                          
        Log.d(TAG, "onSubscribe");                                                                   
    }                                                                                                

    @Override                                                                                        
    public void onNext(String value) {                                                               
        Log.d(TAG, "onNext: " + value);                                                              
    }                                                                                                

    @Override                                                                                        
    public void onError(Throwable e) {                                                               
        Log.d(TAG, "onError");                                                                       
    }                                                                                                

    @Override                                                                                        
    public void onComplete() {                                                                       
        Log.d(TAG, "onComplete");                                                                    
    }                                                                                                
});                                                                                                  

这次我们在每发送一个事件之后加入了一秒钟的延时, 来看看运行结果吧, 注意这是个GIF图:

zip.gif

(贴心的我怕大家看不清楚, 特意调成了老年字体呢)

阿西吧, 好像真的是先发送的水管一再发送的水管二呢, 为什么会有这种情况呢? 因为我们两根水管都是运行在同一个线程里, 同一个线程里执行代码肯定有先后顺序呀.

因此我们来稍微改一下, 不让他们在同一个线程, 不知道怎么切换线程的, 请掉头看前面几节.

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {         
    @Override                                                                                      
    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {                   
        Log.d(TAG, "emit 1");                                                                      
        emitter.onNext(1);                                                                         
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit 2");                                                                      
        emitter.onNext(2);                                                                         
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit 3");                                                                      
        emitter.onNext(3);                                                                         
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit 4");                                                                      
        emitter.onNext(4);                                                                         
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit complete1");                                                              
        emitter.onComplete();                                                                      
    }                                                                                              
}).subscribeOn(Schedulers.io());                                                                   
                                                                                                   
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {           
    @Override                                                                                      
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {                    
        Log.d(TAG, "emit A");                                                                      
        emitter.onNext("A");                                                                       
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit B");                                                                      
        emitter.onNext("B");                                                                       
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit C");                                                                      
        emitter.onNext("C");                                                                       
        Thread.sleep(1000);                                                                        
                                                                                                   
        Log.d(TAG, "emit complete2");                                                              
        emitter.onComplete();                                                                      
    }                                                                                              
}).subscribeOn(Schedulers.io());                                                                   
                                                                                                   
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {               
    @Override                                                                                      
    public String apply(Integer integer, String s) throws Exception {                              
        return integer + s;                                                                        
    }                                                                                              
}).subscribe(new Observer<String>() {                    
    @Override                                                                                      
    public void onSubscribe(Disposable d) {                                                        
        Log.d(TAG, "onSubscribe");                                                                 
    }                                                                                              
                                                                                                   
    @Override                                                                                      
    public void onNext(String value) {                                                             
        Log.d(TAG, "onNext: " + value);                                                            
    }                                                                                              
                                                                                                   
    @Override                                                                                      
    public void onError(Throwable e) {                                                             
        Log.d(TAG, "onError");                                                                     
    }                                                                                              
                                                                                                   
    @Override                                                                                      
    public void onComplete() {                                                                     
        Log.d(TAG, "onComplete");                                                                  
    }                                                                                              
});                                                                                                

好了, 这次我们让水管都在IO线程里发送事件, 再来看看运行结果:

D/TAG: onSubscribe    
D/TAG: emit A         
D/TAG: emit 1         
D/TAG: onNext: 1A     
D/TAG: emit B         
D/TAG: emit 2         
D/TAG: onNext: 2B     
D/TAG: emit C         
D/TAG: emit 3         
D/TAG: onNext: 3C     
D/TAG: emit complete2 
D/TAG: onComplete     

GIF图:

zip_io.gif

诶! 这下就对了嘛, 两根水管同时开始发送, 每发送一个, Zip就组合一个, 再将组合结果发送给下游.

不对呀! 可能细心点的朋友又看出端倪了, 第一根水管明明发送了四个数据+一个Complete, 之前明明还有的, 为啥到这里没了呢?

这是因为我们之前说了, zip发送的事件数量跟上游中发送事件最少的那一根水管的事件数量是有关的, 在这个例子里我们第二根水管只发送了三个事件然后就发送了Complete, 这个时候尽管第一根水管还有事件4 和事件Complete 没有发送, 但是它们发不发送还有什么意义呢? 所以本着节约是美德的思想, 就干脆打断它的狗腿, 不让它发了.

至于前面的例子为什么会发送, 刚才不是已经说了是!在!同!一!个!线!程!里!吗!!!!再问老子打死你!

有好事的程序员可能又要问了, 那我不发送Complete呢? 答案是显然的, 上游会继续发送事件, 但是下游仍然收不到那些多余的事件. 不信你可以试试.

实践

学习了Zip的基本用法, 那么它在Android有什么用呢, 其实很多场景都可以用到Zip. 举个例子.

比如一个界面需要展示用户的一些信息, 而这些信息分别要从两个服务器接口中获取, 而只有当两个都获取到了之后才能进行展示, 这个时候就可以用Zip了:

首先分别定义这两个请求接口:

public interface Api {
    @GET
    Observable<UserBaseInfoResponse> getUserBaseInfo(@Body UserBaseInfoRequest request);

    @GET
    Observable<UserExtraInfoResponse> getUserExtraInfo(@Body UserExtraInfoRequest request);

}

接着用Zip来打包请求:

Observable<UserBaseInfoResponse> observable1 =                                            
        api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());      
                                                                                          
Observable<UserExtraInfoResponse> observable2 =                                           
        api.getUserExtraInfo(new UserExtraInfoRequest()).subscribeOn(Schedulers.io());    
                                                                                          
Observable.zip(observable1, observable2,                                                  
        new BiFunction<UserBaseInfoResponse, UserExtraInfoResponse, UserInfo>() {         
            @Override                                                                     
            public UserInfo apply(UserBaseInfoResponse baseInfo,                          
                                  UserExtraInfoResponse extraInfo) throws Exception {     
                return new UserInfo(baseInfo, extraInfo);                                 
            }                                                                             
        }).observeOn(AndroidSchedulers.mainThread())                                      
        .subscribe(new Consumer<UserInfo>() {                                             
            @Override                                                                     
            public void accept(UserInfo userInfo) throws Exception {                      
                //do something;                                                           
            }                                                                             
        });                                                                               

好了, 本次的教程就到这里吧. 又到周末鸟, 下周见.

相关文章

  • RxJava

    教程 给初学者的RxJava2.0教程(一) 给初学者的RxJava2.0教程(二) 给初学者的RxJava2.0...

  • RXjave总结

    文章 给初学者的RxJava2.0教程(一)给初学者的RxJava2.0教程(二)

  • RxJava2.0的使用

    这里的讲解比较简单,易懂 给初学者的RxJava2.0教程(一) :基本工作原理给初学者的RxJava2.0教程(...

  • Rxjava2

    Season_zl给初学者的RxJava2.0教程 ObservableEmitter emitter 1....

  • rx - 收藏集 - 掘金

    给初学者的 RxJava2.0 教程 (二) - Android - 掘金作者博客 http://www.jian...

  • RxJava整理

    给初学者的RxJava2.0教程 ObservableEmitter 上游可以发送无限个onNext, 下游也可以...

  • test RxJava

    参考自: 给初学者的RxJava2.0教程(一) http://www.jianshu.com/p/464fa02...

  • Rxjava介绍<1>

    Rxjava github地址给初学者的RxJava2.0教程------水管系列手把手教你使用 RxJava 2...

  • Android

    大佬们的传送门: Season_zlc RxJava2 : 1.给初学者的RxJava2.0教程(一)

  • RxJava2使用

    学习入门RxJava2,推荐一位简书作者Season_zlc,他写的教程很详细。 给初学者的RxJava2.0教程...

网友评论

  • 永远改不完的bug的Coder:图示为什么不是4com 而是绿色正方形com呢
    永远改不完的bug的Coder:@lixy_e2c9 谢谢大佬。
    4b3f75c38020:com 指 complete 事件
  • 4c91a85bb11b:有点没懂,因为在一个线程里所以会先后执行,然后说要在不同线程运行,但又都是在IO线程里,不仍然是一个线程吗。。。求解
    Gxinyu:晕,哈哈,真可爱
    4c91a85bb11b:@jesse920524 原来是这个意思,多谢
    jesse920524:两个observable分别在两个不同的io线程执行的.
  • 红颜疯子:我用的2.1.12版本, 发现zip 后面指定io线程, 第一个水管的第4个事件和complete1 是可以发出去的, 不过下游不会接收, 而如果指定不同的线程, 则会报错,UndeliverableException
  • ZzzBj:楼主很棒棒哦 如此深入浅出
  • 木溪bo:写的真的五体投地,通俗易懂
  • sakurekid:08-07 09:30:39.945 25656-25656/com.example.hasee.rxjavademo1 D/233: onSubscribe
    08-07 09:30:39.946 25656-25677/com.example.hasee.rxjavademo1 D/233: emit 1
    08-07 09:30:39.947 25656-25678/com.example.hasee.rxjavademo1 D/233: emit A
    onNext1A
    08-07 09:30:40.947 25656-25677/com.example.hasee.rxjavademo1 D/233: emit 2
    08-07 09:30:40.947 25656-25678/com.example.hasee.rxjavademo1 D/233: emit B
    08-07 09:30:40.947 25656-25677/com.example.hasee.rxjavademo1 D/233: onNext2B
    08-07 09:30:41.948 25656-25678/com.example.hasee.rxjavademo1 D/233: emit C
    08-07 09:30:41.948 25656-25677/com.example.hasee.rxjavademo1 D/233: emit 3
    08-07 09:30:41.949 25656-25678/com.example.hasee.rxjavademo1 D/233: onNext3C
    08-07 09:30:42.949 25656-25677/com.example.hasee.rxjavademo1 D/233: emit 4
    08-07 09:30:42.949 25656-25678/com.example.hasee.rxjavademo1 D/233: emit complete2
    08-07 09:30:42.950 25656-25678/com.example.hasee.rxjavademo1 D/233: oncomplete
    //为啥和上面讲的不一样啊,,不是说长水管就不发了啊
    街道shu记:是不同线程吗?
    长安小狼:我的调试结果跟你的一样,感觉zip接收的事件数量跟上游中发送事件最少水管是有关的,且运行线程也与此有关。
    木溪bo:同问
  • tanzhihao1qaz:这就是我的结果,我想zip应该是更新了什么,大家可以看到当短的水管调用了onComplete之后,观察者的onComplete也收到了,之后观察者的onNext再没出现过,不过长水管还是照样发射,所以我觉得最新的zip,是根据哪根水管先发射onComplete,就到此为止,但不影响长水管继续发射数据,只不过观察者不会再收到罢了
    08-05 14:56:35.901 4455-4475/com.cat.rxjavademo E/copycat: 长水管 - 1 RxCachedThreadScheduler-1
    08-05 14:56:35.903 4455-4476/com.cat.rxjavademo E/copycat: 短水管 - A RxCachedThreadScheduler-2
    onNext - 结合:1A
    08-05 14:56:36.903 4455-4475/com.cat.rxjavademo E/copycat: 长水管 - 2 RxCachedThreadScheduler-1
    08-05 14:56:36.903 4455-4476/com.cat.rxjavademo E/copycat: 短水管 - B RxCachedThreadScheduler-2
    onNext - 结合:2B
    08-05 14:56:37.904 4455-4475/com.cat.rxjavademo E/copycat: 长水管 - 3 RxCachedThreadScheduler-1
    08-05 14:56:37.904 4455-4476/com.cat.rxjavademo E/copycat: 短水管 - onComplete RxCachedThreadScheduler-2
    08-05 14:56:37.905 4455-4476/com.cat.rxjavademo E/copycat: onComplete
    08-05 14:56:38.905 4455-4475/com.cat.rxjavademo E/copycat: 长水管 - 4 RxCachedThreadScheduler-1
    08-05 14:56:39.906 4455-4475/com.cat.rxjavademo E/copycat: 长水管 - onComplete RxCachedThreadScheduler-1
    木溪bo:同层主,测试和楼主不同,zip使用后上游继续发射,而下游没有接收多余的
  • 2d82fc631ed5:请教一下,这个如果失败点击重试怎么让它只调用失败的一个而成功的接口就不再调用呢
    骑小猪看流星:我的理解是 这个是合并的操作 ,理论上后台返回的数据判断长度是否大于0如果不是 可以将这个订阅取消 但是合并会失败 貌似有一个操作符 是可以重试的
  • 82b2842d65f9:我照着作者的操作了一遍,结果跟作者的不一样啊:1.长水管多余部分仍然发送了2.使用sleep在运行到长水管onComplete()报错了 java.lang.InterruptedException:joy:
    昵称的简书:一样,也是这种情况。sleep报错可以尝试用 SystemClock.sleep(1000);代替原来的Thread.sleep(1000);应该就不会报错了,反正我的没有报错了
  • b294308ef55d:“在这个例子里我们第二根水管只发送了三个事件然后就发送了Complete, 这个时候尽管第一根水管还有事件4 和事件Complete 没有发送, 但是它们发不发送还有什么意义呢? 所以本着节约是美德的思想, 就干脆打断它的狗腿, 不让它发了.”
    请问这里的打断狗腿的机制是什么?第二个水管发了Complete后怎么会影响到第一根水管事件的发送呢?楼主之前的文章里曾经提到过即使上游发送了Complete事件后,上游应该还是可以继续发送事件的,只是下游不再接受事件而已。与此处的说法有点矛盾,还请楼主进一步解释一下。
    70574761680c:我测试结果是这样的:1、两个水管都不调onComplete,那么,这两个水管都会把事件发送完 ;2、假如有任何一方调用了onComplete,如A水管调了OnComplete,那么B水管在A调OnComplete之前会一直发送事件。很好理解,就是一方完成了事件发送,那么另一方就没必要再发送事件了
    tanzhihao1qaz:@吕氏春秋i 你那这种例子虽然好理解,但是不适合,层主问的是:为什么上游不会继续发送,和up之前说的有矛盾。另外我也敲过代码,发现长水管剩下的全发送了,和up的情况不一样,如果套用之前up说的“上游会继续发送,但是下游不会收到”,这样的话就好解释为什么长的水管还是会把剩下的发了。
    吕氏春秋i:这还解释啥啊... 打个比方: A媒婆有4个靓仔 B媒婆有3个靓女。假如说男1,2,3号看中了女1,2,3号。这门亲事就这么定了,你觉得B媒婆还有必要领着男4号来想亲么??:cold_sweat:
  • Android苏大强:感谢楼主分享 真的是很详细很详细 目前看到了4章 加油:+1:
  • 奥利奥龙卷风:楼主,这里有点问题哦。这里的第一个水管发送4的后面如果有延时是会报错的。去掉延时之后,上游的4还是会发送的,然后走complete1,再走complete2,再就是onComplete了。这是我运行的情况。
    长安小狼:确实是的,水管1中的事件4还是会发送的,这里不能加sleep
  • 58250567b6d4:我看一章就赞一次,博主好厉害
  • CaoMeng:楼主,讲的真好,希望后期把MVP加上。。。
  • 杨晓是大V:楼主,这篇文章的代码运行多次得到的运行结果都是不一样的,很少会出现像上面git图那么规律交接的
    c5e780c6d6ed:异步线程,执行顺序先后,没有办法控制吧
  • 361579c6b5ba:Q:那么问题来了,现已知支管1直径10cm,支管2直径8cm,总管直径10cm。当支管1流速10L/s,支管2流速8L/s的情况下,求总管的流速是多少?
  • 361579c6b5ba:在ZIP加工过程中,圆形的com丢了。。。
  • Mike张小多:我用Java代码写测试,在同步执行zip的时候和你的结果一样,异步执行的时候只有第一件事有合并剩下的就没有执行了 结果如下, 请问大神这是为什么呢?
    RxJavaText onSubscribe
    RxJavaText emit 1
    RxJavaText emit A
    RxJavaText onNext :1A
    Mike张小多:@Season_zlc 以上描述的情况是不是和Java虚拟机有关系?默认只有一个线程吗?不能调用Thread.sleep(1000);如果调用就会导致无法收到事件?
    Mike张小多:补充描述一下:将代码中Thread.sleep(1000);这一句都删除,在Java测试代码中异步的也正常运行了,但是在每发送一件事后加上Thread.sleep(1000);就只能接收第一个事件,这是为啥啊?
  • 杨晓是大V:最屌的RxJava教程,没有之一
  • 4321177a4375:zip操作符好强大~
  • leilifengxingmw:虽然看到这么好的文章,我还是坚持住了,手没抖,哈哈。
  • 8a0d666f3901:一根水管 Observab1 发送 1,2,3,4,complete,另外一根水管 Observab2 发送 "a","b","c","complete" ,如果都延迟 1s 执行的话,那么在 Observab2 发送完 complete 的时候,下游就会回调 onComplete 方法了,那么这时 Observab1 再去发送 4 ,会抛异常的,所以大佬,那个代码是不是应该把 延迟代码个去掉???
  • 7858f2a4c8f9:我按照你写的这种方法。为什么打印的结果数据不对!!!
    12-22 16:08:06.836 8384-8384/com.example.my_rxjava E/LogUtilsTAG: 接收源发送事件:_____onSubscribe
    12-22 16:08:06.846 8384-8406/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____1
    12-22 16:08:06.846 8384-8407/com.example.my_rxjava E/LogUtilsTAG: 接收源发送事件:_____onNext_1A
    12-22 16:08:06.846 8384-8407/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____A
    12-22 16:08:07.846 8384-8406/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____2
    12-22 16:08:07.846 8384-8407/com.example.my_rxjava E/LogUtilsTAG: 接收源发送事件:_____onNext_2B
    12-22 16:08:07.846 8384-8407/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____B
    12-22 16:08:08.846 8384-8406/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____3
    12-22 16:08:08.846 8384-8407/com.example.my_rxjava E/LogUtilsTAG: 接收源发送事件:_____onNext_3C
    12-22 16:08:08.846 8384-8407/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____C
    12-22 16:08:09.846 8384-8406/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____4
    12-22 16:08:09.846 8384-8407/com.example.my_rxjava E/LogUtilsTAG: 接收源发送事件:_____onComplete
    12-22 16:08:09.846 8384-8407/com.example.my_rxjava D/LogUtilsTAG: 发射源发送事件:_____onComplete_2

    发送源发送事件4 还是打印了,
  • 8bc7aeec5ec8:用zip 为什么Observable1 和 Observable2 发送事件数量不一样的情况下 会报 java.lang.InterruptedException???
  • 74e077cb030e:发现了一个大问题。第二个水管还没有发射数据,下游的水管就已经得到了组合就的数据‘
    a发射源发射了A
    a发射源发射了B
    a发射源发射了C
    11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: a发射源发射了onComplete
    第1次组合数据
    第1次接收数据A1
    b发射源发射了1
    第2次组合数据
    第2次接收数据B2
    b发射源发射了2
    b发射源发射了onComplete
  • 74e077cb030e:发现了一个大问题。第二个水管还没有发射数据,下游的水管就已经得到了组合就的数据‘

    ’11-30 18:05:59.953 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: a发射源发射了A
    11-30 18:05:59.953 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: a发射源发射了B
    11-30 18:05:59.953 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: a发射源发射了C
    11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: a发射源发射了onComplete
    11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: 第1次组合数据
    11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: 第1次接收数据A1
    11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: b发射源发射了1
    11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: 第2次组合数据
    11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: 第2次接收数据B2
    11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: b发射源发射了2
    11-30 18:05:59.954 11158-11158/map.zym.com.rxandroiddemo I/ZipActivity: b发射源发射了onComplete
  • 82fb67508fa0:3个或四个请求怎么用zip 还是用别的?
  • fa16e00eeb9c:哇擦, 这个有点叼啊,有些接口做不到同时返回,需要分多个接口请求数据,这个zip功能可以合并,6666
    fa16e00eeb9c:RX果然是处理线程牛逼的好东西啊
  • GitHub_itgoyo:博主,Rxjava系列的文章分析的很好,想问问可以转载么,会保留原出处,毕竟这么好的教材,这么好的配图,也就只有你写的出来。
    Season_zlc:@itgoyo 可以转载
  • GitHub_itgoyo:再问老子打死你!!有脾气的博主,关注了!
  • x耶律:请问我按照你的代码(加sleep1000的写法),打印结果是:
    rxjava2demo I/MainActivity: emit 1
    rxjava2demo D/MainActivity: emit A
    rxjava2demo D/MainActivity: onNext:----1A
    rxjava2demo I/MainActivity: emit 2
    rxjava2demo D/MainActivity: emit B
    rxjava2demo D/MainActivity: onNext:----2B
    rxjava2demo I/MainActivity: emit 3
    rxjava2demo D/MainActivity: emit C
    rxjava2demo D/MainActivity: onNext:----3C
    rxjava2demo I/MainActivity: emit 4
    rxjava2demo D/MainActivity: emit complete2
    rxjava2demo D/MainActivity: onComplete
    这里是会打印emit 4的,跟文中:
    zip发送的事件数量跟上游中发送事件最少的那一根水管的事件数量是有关的, 在这个例子里我们第二根水管只发送了三个事件然后就发送了Complete, 这个时候尽管第一根水管还有事件4 和事件Complete 没有发送, 但是它们发不发送还有什么意义呢? -------感觉有点不符,请问可以解答一下吗
  • ayvytr:这个zip应该变了,最新的rxJava,用zip,不加线程切换,两个会一起执行
    越努力越幸运阳:试了一下2.1.8的版本还是需要添加.subscribeOn(Schedulers.io())才可以,不过不需要Thread.sleep(1000); 了
  • bc1ff59c5b30:为什么会发送, 刚才不是已经说了是!在!同!一!个!线!程!里!吗!!!!再问老子打死你!
  • 叨码:其实 很多操作符 结合应用场景 才好理解。赞一个👍
  • liangqibin:有个问题,如果第一个发射源第四位发射onerror,第二个发射源的第四位发射onComplete,最终会执行那个哪个方法?
    liangqibin:首先onComplete和onError是唯一且互斥的,它们在发射源1和2的位置都是第四位,只能执行一个
    1)相同线程的情况下:按发射源在代码中的顺序执行
    下游收到发射源1发送onComplete的话,会接收发射源2的前3个事件,第4个onError会忽略,最后会触发onComplete
    下游收到发射源1发送onError的话,会立即触发下游的onError事件,同时会忽略发射源2发送的所有事件
    2)不同线程的话其实和相同线程的现象基本一致
    上游先发送onComplete或者onError,下游就触发哪个,不同的是下游触发了onComplete或者onError之后,没有发送的上游事件都不会再发送了
  • 汐_Ushio_64ff:如果两个请求一个是同步请求,另一个是异步请求(返回结果再回调里处理),请问能用zip操作符解决吗?目前没有好的解决方案。
    liangqibin:你这种情况和作者文章中最后一个例子是一样的
    作者他是把两个发射源都放在io线程里面(.subscribeOn(Schedulers.io())),你这种情况是一个在主线程,一个在子线程
    你可以把你同步请求的observable的.subscribeOn(Schedulers.io())直接去掉或者改为.subscribeOn(AndroidSchedulers.mainThread())就行了
  • ImTudou:这是我看过最简单易懂的Rxjava2 教程了
  • zhang_pan:楼主你好,为什么我的成功后并不会调用observe的onComplete方法,所以还会调用emitter的第四条
  • e64f2bee9601:您好,事件流转换的过程中, 如果有onComplete或onError的情况下,应该是4正方形吧,而不是com正方形。求解。
  • developerYk:楼主,我按照你的方式加上线程切换试了下 ZIP 操作符 程序崩溃了 ; FATAL EXCEPTION: RxCachedThreadScheduler-1
    Process: com.beijing.RxJavaTest, PID: 13487
    java.lang.InterruptedException
    at java.lang.Thread.sleep(Native Method)
    at java.lang.Thread.sleep(Thread.java:379)
    at java.lang.Thread.sleep(Thread.java:321)
    at com.beijing.RxJavaTest.rxjava_test.Rxjava_1$7.subscribe(Rxjava_1.java:157)
    PeanutZYH:我的也出现这个报错了,请问是什么原因呢 ?
  • 圈圈猫:最后一个问题怎么感觉有点不对啊,同步是没错,但是如果两个数据的数量不一致咋整啊???
  • Android小人:拷贝你的程序运行了下 emitter.onNext(4);后面的这行Thread.sleep(1000);会报错,这是为什么呢?为啥你那没有报错呢?
    李小神不偷懒:@Android小人 我今天把延迟一去掉,跟在同一个线程中的状况没有什么不同。
    Android小人:@Android小人 不对呀! 可能细心点的朋友又看出端倪了, 第一根水管明明发送了四个数据+一个Complete, 之前明明还有的, 为啥到这里没了呢?这段话有问题吧,你的那段程序和之前的程序并没有什么变化也并没有把请求一的com去掉啊?
    Android小人:而且如果不加sleep的话即使加上io线程他也好像是1先完成的,我感觉这个不管谁先完成到无所谓,只要最后结合不出问题就好
  • 无言_b573:你好 我做了个测试 就像你文章里面说的 我写的狗腿并没有被打断。。 调用了onComplete之后还是在继续发送事件 难道是版本不一致源码改了??
  • f13ab180e7fd:为什么我把两个都指定在io线程的话,执行到 Log.d(TAG, "emit 4"); emitter.onNext(4);Thread.sleep(1000);睡眠的时候会挂了????
  • CCY0122:妈的 越看越爽
  • smart蒋蒋:最强RXJAVA解析。
  • bd180dab6513:很少有人的像你这样能写高质量的博客
  • 安卓猿:楼主写的太好了,循序渐进
  • 我一定会学会:写的真好,结尾还说在Android里面有什么用,有时候知道那个方法。但是不知道那个方法有什么用。
  • doc__wei:其实每次看到逐个发射的两个Observable去做zip,我都害怕,因为有一个完成了,之后后面就接收不到数据了,但是,我后来想想了,在实际的场景中,正如你最后的例子里面用zip,其实没问题,observable1和observable2他们都只是做一次性发射所有的数据,并且没有手动调用完成方法,所以合并不会存在错误,也不会出现漏掉数据的情况
  • bingo_ff85:楼主,那个高清gif图是怎么录得?
    bingo_ff85:@Season_zlc 感谢~
    顶级工程师闯天涯:看了文章我就知道有人会问这个问题...
    Season_zlc:@bingo_ff85 GifCam
  • 南山下北海北:想和楼主做男男朋友:smirk:
    c84a6998a6c6:你们真是够了,就服你们,我们早就领证了
    205e883a5fab:@JerryIreya 别抢 楼主是我的
    瓶子里的王国:三个人一起可以不:flushed:
  • _Alan_:写的很容易懂,谢谢作者
  • Chen_ba7f:"本着节约是美德的思想, 就干脆打断它的狗腿, 不让它发了". 单身狗看到这,不自觉的缩了下腿
  • bingoCode:即使添加 了io线程,不添加延时1s 还是先发送完,然后才zip的
  • ruidge:我用两个线程不sleep基本没发现规律啊.感觉还是乱续发的.
    Observable.zip(observable2, observable1, new BiFunction<String, Integer, String>()
    结果:
    D/rxjava: onSubscribe
    D/rxjava: emit A RxCachedThreadScheduler-1
    D/rxjava: emit 1 RxCachedThreadScheduler-2
    D/rxjava: emit B RxCachedThreadScheduler-1
    D/rxjava: onNext: A+1
    D/rxjava: emit C RxCachedThreadScheduler-1
    D/rxjava: emit 2 RxCachedThreadScheduler-2
    D/rxjava: emit complete2
    D/rxjava: onNext: B+2
    D/rxjava: emit 3 RxCachedThreadScheduler-2
    D/rxjava: onNext: C+3
    D/rxjava: onComplete
    D/rxjava: emit 4 RxCachedThreadScheduler-2
    D/rxjava: emit complete1
    neo1949:我的不加sleep也是这种情况:
    08-09 16:26:19.430 26773-26854/ivan.rich.guardian D/TestActivity: [subscribe] observable1 emit 0
    08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [subscribe] observable2 emit a
    08-09 16:26:19.430 26773-26854/ivan.rich.guardian D/TestActivity: [subscribe] observable1 emit 1
    08-09 16:26:19.430 26773-26854/ivan.rich.guardian D/TestActivity: [subscribe] observable1 emit 2
    08-09 16:26:19.430 26773-26854/ivan.rich.guardian D/TestActivity: [subscribe] observable1 emit 3
    08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [accept] 0----a
    08-09 16:26:19.430 26773-26854/ivan.rich.guardian D/TestActivity: [subscribe] observable1 onComplete
    08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [subscribe] observable2 emit b
    08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [accept] 1----b
    08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [subscribe] observable2 emit c
    08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [accept] 2----c
    08-09 16:26:19.430 26773-26855/ivan.rich.guardian D/TestActivity: [subscribe] observable2 onComplete
  • 5a0c727074e1:很6的分析。
  • 0445981d6022:可以的,6666老司机。
  • egan_ysk:很赞,帮助很大,一直没有很好的系列来学习RxJava;非常感谢;
  • 1cf651476639:写的很是清楚啊,辛苦楼主了
  • 橙一升:简单通俗易懂~看过这么多关于 Rxjava 的文章,就您的最容易理解了:heart:
  • 危大强:666666
  • 说码解字:Observable<UserBaseInfoResponse> observable1 =
    api.getUserBaseInfo(new UserBaseInfoRequest()).subscribeOn(Schedulers.io());
    这里在使用Retrofit发起网络请求的时候规定被观察者发送数据流所在的线程是不是多余的?Retrofit已经定义了,而且这里使用subscribeOn(Schedulers.io())不会生效。
  • 流星留步:真是一个吊炸天的功能呀,以前遇到这种情况,都是加一个全局变量的flag,回调的时候改一下值,另一个回调的时候,再来判断一下,想想自己真是个愚民,全票出我好了:joy: :joy:
  • DreamArea:楼主,我尝试着把上面例子中的observable1 和observable2 调换个位置(不在同一个线程),会报错的InterruptedException,zip这个操作符使用要求蛮高啊。。
    xwp:看了zip,想到以前不同请求需要同步完成才展示数据时候,用的是cuntdown.那种可把握性高,这种也有他的优势,就是发送器发送数据流,响应速度会快,但网络请求实际情况不会这样。
  • andpy:本来很懒的,不过看你逗逼的感觉了,来评论了.:smile:
  • double_so3:写的真好,支持
  • e93efda72e89:为楼主注册的简书,一个月前就想学rxjava了,就是看了几篇入门都看不下去。楼主这篇入门写的比较通熟易懂,感谢分享。
  • forward_99:楼主,你好,有些地方不是很明白,请教一下。我在zip中的第三个例子,第一种情况:如果我不切换observable1线程,直接像zip中的第一、第二个例子那样,而且observable2还是保留切换到Schedulers.io()线程执行,我这边多次打印,结果都是observable1发送完事件后,observable2才发送事件;第二种情况:我切换observable1在线程AndroidSchedulers.mainThread()中执行,observable2仍然在线程Schedulers.io()执行,打印出来的结果跟observable1和observable2均在线程Schedulers.io()执行情况一致;第三种情况:我切换observable1在线程AndroidSchedulers.mainThread()中执行,observable2不切换线程,直接执行,那么打印出来的结果却是observable2发送完事件后observable1才会发送事件;第四种情况:我切换observable1在线程Schedulers.io()中执行,observable2不切换线程直接执行,那么打印出来的结果跟observable1和observable2均在线程Schedulers.io()执行情况一致。上面四种情况不是很了解,麻烦有时间可以抽空讲解下,谢谢!
    xwp:两次Schedulers.io()分别是2个线程,打印线程名字看看。
  • 薰衣草的国度:写的很不错!:+1:
  • 39e72fb518b9:Retrofit @GET请求 怎么能在方法体内用@body呢?每次都是崩溃得。@body是用在Post请求吧?
    catface:你换成POST就行了
  • 李是猴子搬来的救兵:如果不加上Sleep的话,结果还是一根水管先emit完
    zhang_pan:加sleep是为了模拟耗时操作,你可以将时间调很小,如果不加耗时,按照代码的顺序,1执行完了,2才执行
    31624f75fba8:并且另一个水管剩余的一条和complete也会发送
  • 苏村的南哥:我也出现了 Thread.sleep(1000); 会报错。。。。 纠结中〜〜:sob:
    张金富呵呵:用SystemClock.sleep(1000)
  • aee30df11e01:这个zip很有用
  • 0丸子0:"在这个例子里我们第二根水管只发送了三个事件然后就发送了Complete, 这个时候尽管第一根水管还有事件4 和事件Complete 没有发送, 但是它们发不发送还有什么意义呢? 所以本着节约是美德的思想, 就干脆打断它的狗腿, 不让它发了."
    =》针对这句不太理解,是你主动打断了?为什么我的demo还是会发送。按我的理解,complete之后只是接收方不会去接收,发送方继续发送才对吧。请楼主指导。:disappointed_relieved:
  • a33855e6741f:我运行的结果是这样的, 并且抛出了异常.
    /? D/DemoRx: onSubscribe
    /? D/DemoRx: emit 1
    /? D/DemoRx: emit A
    /? D/DemoRx: onNext: 1A
    /com.example.sphinx.rxjavate D/DemoRx: emit 2
    /com.example.sphinx.rxjavate D/DemoRx: emit B
    /com.example.sphinx.rxjavate D/DemoRx: onNext: 2B
    /com.example.sphinx.rxjavate D/DemoRx: emit 3
    /com.example.sphinx.rxjavate D/DemoRx: emit C
    /com.example.sphinx.rxjavate D/DemoRx: onNext: 3C
    /com.example.sphinx.rxjavate D/DemoRx: emit 4
    /com.example.sphinx.rxjavate D/DemoRx: emit complete2
    /com.example.sphinx.rxjavate D/DemoRx: onComplete


    异常出现在observable1中第四个Thread.sleep(1000);InterruptedException
    慕名66:@JesseOne 使用SystemClock.sleep()就可以啊
    23a4ae7b3afd:表示遇到了同样的疑问,希望楼主出手解决一下哇:smile:
  • 贾亦真亦贾:这个作者不出书真是可惜了 受教了 你是我见过的最好的Rxjava2方面的文章
    Season_zlc: @贾亦真亦贾 过奖了😄
  • 浮華杳然成歌:至于前面的例子为什么会发送, 刚才不是已经说了是!在!同!一!个!线!程!里!吗!!!!再问老子打死你! 被这句笑死了。。。。哈哈啊哈
  • wuhtt:写的6
  • eoeoops:看得很舒服:+1:
  • adonis_lsh:还有一个疑问就是最后例子中写的new UserInfo(baseInfo, extraInfo);这个信息合并不是还是靠的UserInfo的构造方法,用不用zip又有什么关系呢?
    3dea4b14f5d2:这个意思就是,同时请求两个接口,然后都请求成功后获取到数据后直接返回一个对象,就可以操作这个对象了,如果平时使用的话,肯定是有先后的,先获取一个接口的数据,再请求另一个,成功后再设置,相当于分了两步。
    swallowsonny:baseInfo与extraInfo同时成功了才合并吧
  • adonis_lsh:图形中按照逻辑应该是4Com为什么单有一个Com?郁闷中
    23a4ae7b3afd:两个上游运行在不同的线程中,哪个先执行是不确定的。至于你说的问题这个应该是看先有的com还是先有的4,应该是遇到com接下里的就不再发了。
    ZYRzyr:com的意思应该是调了onComplete方法,所以只是一个com
  • 西园无忌:看了4期,作为一只铁公鸡得我基本每期都小小打赏。文章看的很舒服,谢谢作者Season_zlc热心分享!
    Season_zlc:哈哈,谢谢,我都能吃好几个茶叶蛋了:smile:
  • d8184ca3c970:多谢楼主的文章,求个建个群以后多多指教.
    南山下北海北:后宫群吗?:flushed:
  • 77356c477023:发现一个问题,当短的数据流结束后,长的数据流中未发送的数据后的Thread.sleep(1000);有概率报错(我也不知都怎么回事,有时候报错,有时候不报,我就直接删了) :scream:
    Season_zlc:@节操打狗有去无回 对,这是rxjava2一个坑,rxjava1就没有这个问题
    77356c477023:@Season_zlc 我天,我找了好久的抓异常的方法,出现这个报错是不是长数据流所在的线程正在结束但还没有结束,而在sleep的过程中线程被结束了所以报错?
    Season_zlc:@节操打狗有去无回 加上 static {RxJavaPlugins.setErrorHandler(new Consumer() {
    @Override
    public void accept(Throwable throwable) throws Exception {
    if (throwable instanceof InterruptedIOException) {
    Log.d(TAG, "Io interrupted");
    }
    }
    });
    }
  • f1fd3abad113:写的很好,手抖打赏了,嘻嘻。希望能推出更多的教程。
    Season_zlc: @LxChange 好的,谢谢😄
    Season_zlc:@LxChange 哈哈,谢谢
  • 你值得拥有更好的12138:感谢楼主,感觉和1.0差不多
    Season_zlc:@10000程序猿 整体差别不大
  • 6ee72519b51d:请求一 请求二 分别在两个线程
    如果请求一很快就请求完了 请求二隔了一段时间才请求好 这时是不是最终zip的时机是取决于 请求二完成的时机?
    假如请求二 请求失败了是不是也算整个zip操作失败?
    如果我不想整个操作失败 我就去用flatmap来替代zip 来实现 两个请求揉合成一个请求的 这种需求?
    6ee72519b51d:@Season_zlc zip 如果第一个失败了 那最后的结果只有第一个请求的数据 ? 还是整体 zip操作都算失败? 我觉得会是第一个
    Season_zlc: @_Vv 第三个问题,用flatmap,两个请求是有顺序的,第一个失败了,整个就失败了,第一个成功,第二个失败是可以处理的。
    Season_zlc: @_Vv 前面两个问题的答案是Yes.
  • 5ee7a756a4d7:下周了
    Season_zlc: @专宠玲 哈哈,哪有那么快写完!

本文标题:给初学者的RxJava2.0教程(四)

本文链接:https://www.haomeiwen.com/subject/srzemttx.html