美文网首页
RxJava2.0----事件流操作符Observable Ut

RxJava2.0----事件流操作符Observable Ut

作者: Calllanna | 来源:发表于2017-11-19 16:21 被阅读300次

    6.事件流操作符Observable Utility Operators

    A toolbox of useful Operators for working with Observables
    ● Delay
    ● Do
    ● Materialize/Dematerialize
    ● Serialize
    ● TimeInterval
    ● Timeout
    ● Timestamp
    ● Using
    ● To
    ● Retry
    ● cache
    ● cast
    ● compese

    ● Delay
    将一个事件流里的数据源全部都延时发送。

    ● Do
    在观察者订阅前,接收数据前后,完成接收前后,事件流过程中发生错误后,事件流结束前后等回调被观察者通知
    doAfterTerminate doOnComplete doOnDispose doOnEach doOnError doOnLifecycle doOnNext doOnSubscribe doOnTerminate onTerminateDetach

      private Observer observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                print("onSubscribe ");
            }
    
            @Override
            public void onNext(Integer integer) {
                print("onNext "+integer);
            }
    
            @Override
            public void onError(Throwable e) {
                print("onError "+e.getMessage());
            }
    
            @Override
            public void onComplete() {
                print("onComplete " );
            }
        };
        private Observable getObservable(final boolean isError){
            return  Observable.just(1,2,3,4,5)
                    .doOnSubscribe(new Consumer<Disposable>() {
                        @Override
                        public void accept(Disposable disposable) throws Exception {
                            print("doOnSubscribe ");
                        }
                    })
                    .doOnEach(new Consumer<Notification<Integer>>() {
                        @Override
                        public void accept(Notification<Integer> integerNotification) throws Exception {
                            print("doOnEach :"+integerNotification);
                        }
                    }).doAfterNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            print("doAfterNext : "+integer  );
                            if(isError && integer == 3){
                                throw new Exception("There is a Error!!");
                            }
                        }
                    }).doAfterTerminate(new Action() {
                        @Override
                        public void run() throws Exception {
                            print("doAfterTerminate : "  );
                        }
                    }).doOnComplete(new Action() {
                        @Override
                        public void run() throws Exception {
                            print("doOnComplete : "  );
                        }
                    }).doFinally(new Action() {
                        @Override
                        public void run() throws Exception {
                            print("doFinally : "  );
                        }
                    }).doOnDispose(new Action() {
                        @Override
                        public void run() throws Exception {
                            print("doOnDispose : "  );
                        }
                    }).doOnTerminate(new Action() {
                        @Override
                        public void run() throws Exception {
                            print("doOnTerminate : "  );
                        }
                    }).onTerminateDetach()
                    .doOnError(new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            print("doOnError : "  );
                        }
                    }).doOnLifecycle(new Consumer<Disposable>() {
                        @Override
                        public void accept(Disposable disposable) throws Exception {
                            print("doOnLifecycle : accept"  );
                        }
                    }, new Action() {
                        @Override
                        public void run() throws Exception {
                            print("doOnLifecycle Action : "  );
                        }
                    });
        }
        private void doAct1() {
            //需要引入RxJava 1.0
            //-------------buffer operator------
            tx_console.setText("Do");
            getObservable(false).subscribe(observer);
    Log.d(" ", "  =================");
            getObservable(true).subscribe(observer);
    }
    输出结果:
      doOnSubscribe
              doOnLifecycle : accept
              onSubscribe
              doOnEach :OnNextNotification[1]
              onNext 1
              doAfterNext : 1
              doOnEach :OnNextNotification[2]
              onNext 2
              doAfterNext : 2
              doOnEach :OnNextNotification[3]
              onNext 3
              doAfterNext : 3
              doOnEach :OnNextNotification[4]
              onNext 4
              doAfterNext : 4
              doOnEach :OnNextNotification[5]
              onNext 5
              doAfterNext : 5
              doOnEach :OnCompleteNotification
              doOnComplete :
              doOnTerminate :
              onComplete
              doFinally :
              doAfterTerminate :
    ================================
    
              doOnSubscribe
              doOnLifecycle : accept
              onSubscribe
              doOnEach :OnNextNotification[1]
              onNext 1
              doAfterNext : 1
              doOnEach :OnNextNotification[2]
              onNext 2
              doAfterNext : 2
              doOnEach :OnNextNotification[3]
              onNext 3
              doAfterNext : 3
              doOnTerminate :
              doOnError :
              onError There is a Error!!
              doFinally :
              doAfterTerminate :
    

    ● Materialize/Dematerialize
    Materialize返回一个被观察者对象,该对象发射源数据的所有数据,以及通知,每一项item通过一个标记类Notification封装源数据以及通知。Dematerialize 则和materialize功能相反。

    
    
     Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    e.onNext("aaaa");
                    e.onNext("bbbb");
                    e.onNext("cccc");
                    e.onComplete();
                }
            }).materialize()
                    .map(new Function<Notification<String>, Notification<String>>() {
                @Override
                public Notification<String> apply(Notification<String> stringNotification) throws Exception {
                    print("materialize:"+stringNotification +"--->getValue:"+stringNotification.getValue()
                            +"--->isOnComplete:"+stringNotification.isOnComplete()
                            +"--->isOnError:"+stringNotification.isOnError() );
                    return stringNotification;
                }
            }).dematerialize().subscribe(new Consumer<Object>() {
                @Override
                public void accept(Object o) throws Exception {
                    print("dematerialize:"+o.toString());//
                }
            });
    输出结果
             materialize:OnNextNotification[aaaa]--->getValue:aaaa--->isOnComplete:false--->isOnError:false
               materialize:OnNextNotification[bbbb]--->getValue:bbbb--->isOnComplete:false--->isOnError:false
               materialize:OnNextNotification[cccc]--->getValue:cccc--->isOnComplete:false--->isOnError:false
               materialize:OnCompleteNotification--->getValue:null--->isOnComplete:true--->isOnError:false
               dematerialize:aaaa
               dematerialize:bbbb
               dematerialize:cccc
    

    ● Serialize
    当ObservalbeSource数据源是从不同线程回调观察者(发射数据),那么极有可能出现其中一个线程调用观察者的onComplete()或则onError()发生在另一个线程调用onNext()之前,或则两个线程同时第调用观察者的onNext(),而Serialize 操作是给观察者的回调添加同步锁synchronized,来确保Observalbe对其观察者进行序列化的调用.

    ● TimeInterval
    返回上级数据源每个数据从接收到发送的时间间隔的Observable。

    ● Timeout
    当一个事件流中每一个数据在一定时间内没有发射出去,则抛出超时异常

    ● Timestamp
    返回每个数据源发射的时候的时间戳的Observable。

     Observable.intervalRange(0,10,0,500,TimeUnit.MILLISECONDS)
                    .timeInterval().subscribe(new Consumer<Timed<Long>>() {
                @Override
                public void accept(Timed<Long> longTimed) throws Exception {
                    print("timeInterval---Timed--->"+longTimed.time());//0
                }
            });
            Observable.intervalRange(0,10,0,5500,TimeUnit.MILLISECONDS)
                    .timeout(5000,TimeUnit.MILLISECONDS )
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            print("timeout---->"+aLong);//0
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            print("timeout----Throwable>"+throwable.getMessage());
                        }
                    });
            Observable.intervalRange(0,10,0,5500,TimeUnit.MILLISECONDS)
                    .timestamp()
                    .subscribe(new Consumer<Timed<Long>>() {
                        @Override
                        public void accept(Timed<Long> longTimed) throws Exception {
                            print("timestamp---Timed--->"+longTimed.time());//1510388694052
                        }
                    });
             11-11 16:24:54.034 : timeInterval---Timed--->0
             11-11 16:24:54.044   timeout---->0
             11-11 16:24:54.044   timestamp---Timed--->1510388694052
             11-11 16:24:54.544   timeInterval---Timed--->500
             11-11 16:24:55.034   timeInterval---Timed--->500
             11-11 16:24:55.544   timeInterval---Timed--->500
             11-11 16:24:56.034   timeInterval---Timed--->500
             11-11 16:24:56.544   timeInterval---Timed--->500
             11-11 16:24:57.034   timeInterval---Timed--->500
             11-11 16:24:57.544   timeInterval---Timed--->500
             11-11 16:24:58.034   timeInterval---Timed--->500
             11-11 16:24:58.534   timeInterval---Timed--->500
             11-11 16:24:59.044   timeout----Throwable>null
             11-11 16:24:59.544   timestamp---Timed--->1510388699553
             11-11 16:25:05.044   timestamp---Timed--->1510388705053
             11-11 16:25:10.544   timestamp---Timed--->1510388710553
             11-11 16:25:16.044   timestamp---Timed--->1510388716053
             11-11 16:25:21.544   timestamp---Timed--->1510388721553
             11-11 16:25:27.044   timestamp---Timed--->1510388727053
             11-11 16:25:32.544   timestamp---Timed--->1510388732553
             11-11 16:25:38.044   timestamp---Timed--->1510388738053
             11-11 16:25:43.544   timestamp---Timed--->1510388743553
    

    ● Using
    通过对源资源对象的生命周期的控制(对源数据订阅),产生一个对源数据经过处理后的ObservableSource

     Observable.using(new Callable<String>() {
                @Override
                public String call() throws Exception {
                    return "hello";//----源数据
                }
            }, new Function<String, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(String s) throws Exception {
                    return Observable.just(s+"----》你好!");//--------目标数据
                }
            }, new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    print("using----->"+s);//hello----收到源数据
                    throw new Exception("源数据-----Error :"+s);
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    print("using Consumer accept----->" + s);//hello----》你好!
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    print("using Consumer throwable----->" + throwable.getMessage());
                }
            });
    输出结果:
             using Consumer accept----->hello----》你好!
             using----->hello
             using Consumer throwable----->源数据-----Error :hello
    

    ● To
    转换操作。
    blockingIterable blockingLatest blockingMostRecent blockingNext sorted to toFuture toList toMap toMultimap toSortedList

    String first = Observable.just("aaaa",2,3).blockingFirst().toString();
            print(""+first);//aaaa
            Iterable<String> stringIterable = Observable.just("1","2","3").blockingIterable();
            Iterator iterator = stringIterable.iterator();
            while (iterator.hasNext()){
                print(""+iterator.next());
            }
            //1,2,3
    
    
            Observable.just("1","2","3").toMap(new Function<String, String>() {
                @Override
                public String apply(String s) throws Exception {
                    return s+"+"+s;
                }
            }).subscribe(new Consumer<Map<String, String>>() {
                @Override
                public void accept(Map<String, String> stringStringMap) throws Exception {
                    print("toMap   "+stringStringMap );//{2+2=2, 3+3=3, 1+1=1}
                }
            });
            Observable.just(5,3,6,3,9,4)
                    .toSortedList().subscribe(new Consumer<List<Integer>>() {
                @Override
                public void accept(List<Integer> integers) throws Exception {
                    print("toSortedList"+integers);//[3, 3, 4, 5, 6, 9]
                }
            });  
    

    ● Retry
    当发生错误的时候,重新发射数据。

     Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    e.onNext("2222");
                    e.onError(new Throwable("Sorry!! an error occured sending the data"));
                }
            }).retry(3)
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            print("retry--->" + s);
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            print("retry--->throwable:" + throwable.getMessage());
                        }
                    });
           输出结果:
             retry--->2222
             retry--->2222
             retry--->2222
             retry--->2222
             retry--->throwable:Sorry!! an error occured sending the data
    

    ● cache
    当第一次订阅时,缓存所有的项目和通知,以使后续订阅者也可以接收到数据

    ObservableEmitter<String> emitter = null;
            Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    emitter = e;
                    emitter.onNext("1-----onNext");
    
                }
            });
            Observable.intervalRange(0, 5, 100, 5, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    emitter.onNext("intervalRange  send " + aLong);
                }
            });
    
            observable.cache().subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String aLong) throws Exception {
                            print("no cache---->" + aLong);
                        }
                    });
            observable.delay(2000,TimeUnit.MILLISECONDS).subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String aLong) throws Exception {
                            print(" cache---->" + aLong);
                        }
                    });
            observable
                    .delay(4000,TimeUnit.MILLISECONDS)
                    .onTerminateDetach()
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String aLong) throws Exception {
                            print("onTerminateDetach cache---->" + aLong);
                        }
                    });
    输出结果:
    /**
     *================================================no  cache ===========
      no cache---->1-----onNext
      cache---->intervalRange  send 0
      cache---->1-----onNext
      cache---->intervalRange  send 1
      cache---->intervalRange  send 2
      onTerminateDetach cache---->1-----onNext
      onTerminateDetach cache---->intervalRange  send 3
      onTerminateDetach cache---->intervalRange  send 4
    
     ===================================================cache===========
     : no cache---->1-----onNext
       cache---->1-----onNext
       onTerminateDetach cache---->1-----onNext
       onTerminateDetach cache---->intervalRange  send 0
       onTerminateDetach cache---->intervalRange  send 1
       onTerminateDetach cache---->intervalRange  send 2
       onTerminateDetach cache---->intervalRange  send 3
       onTerminateDetach cache---->intervalRange  send 4
     */
    

    ● cast
    在将其转换为指定类型后,从源观察源发出每个项目,实际上通过map(Functions.castFunction(clazz))来实现,本质上是一个map操作。

     Observable.just("1", "2", "3") 
                    .cast(Integer.class)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer val) throws Exception {
                            print("cast---->" + val);
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                            print("" + throwable.getMessage());//java.lang.String cannot be cast to java.lang.Integer
                        }
                    });
    

    ● compose
    自定义操作符,参数为ObservableTransformer ,可以继承ObservableTransformer 实现方法apply,来制定自己的运算符。

     Observable.just("1", "2", "3")
                    .compose(schedulersTransformer())
                    .subscribe();
    //自定义线程调度操作符
     public ObservableTransformer schedulersTransformer() {
            return new ObservableTransformer() {
                @Override
                public ObservableSource apply(Observable upstream) {
                    return upstream.subscribeOn(Schedulers.computation())
                            .observeOn(AndroidSchedulers.mainThread());
                }
            };
        }
    

    总结
    终于将这些Rxjava2.0的操作符讲完了,哈哈!!妈妈再也不用担心我不会用RxJava操作符了!!


    噢耶!

    这可能是世上最全操作符详解,虽然每个演示的Demo简单,但是应该可以根据输出结果理解,如果还不太明白,或者有疑问,动手自己敲段代码跑一下。哈哈!小伙伴们,不要忘记点个赞哦!

    本系列文章的demo演示代码下载地址:
    https://github.com/Callanna/RxLoad.git
    找到该项目下的demo的module就可以了哦。
    同时也可以支持一下我正在写的RxLoad这个类库,一个使用Rxjava实现加载图片,加载文件,加载网页的lib。

    相关文章

      网友评论

          本文标题:RxJava2.0----事件流操作符Observable Ut

          本文链接:https://www.haomeiwen.com/subject/lotomxtx.html