美文网首页
RxJava基础六-辅助操作符

RxJava基础六-辅助操作符

作者: 清水杨杨 | 来源:发表于2019-05-02 16:41 被阅读0次

    此篇内容均是来自书籍《RxJava响应式编程》李衍顺 著

    3.6 辅助操作符

    3.6.1 delay

    delay操作符就是让发送数据的时机延后一段时间,这样所有的数据都会一次延后一段时间发送。在RxJava中将其实现为delay和delaySubscription,二者的不同之处在于delay四延时数据发送, 而delaySubscription是延时注册Subscriber。

    /**
         * delay 
         */
        private Observable<Long> createObserver(final int index){
            return Observable.create(new Observable.OnSubscribe<Long>() {
                @Override
                public void call(Subscriber<? super Long> subscriber) {
                    log("subscribe:" + getCurrentTime());
                    for(int i=1; i<=index; i++){
                        subscriber.onNext(getCurrentTime());
                    }
                }
            }).subscribeOn(Schedulers.newThread());
        }
        private Observable<Long> delayObserver() {
            return createObserver(2).delay(2000, TimeUnit.MILLISECONDS, Schedulers.trampoline());
        }
        private Observable<Long> delaySubscriptionObserver() {
            return createObserver(2).delaySubscription(2000, TimeUnit.MILLISECONDS, Schedulers.trampoline());
        }
        private void delayTest(){
            log("start subscribe: "+ getCurrentTime());
            delayObserver().subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("delay:" + (getCurrentTime()-aLong));
                }
            });
            log("start subscribe: "+ getCurrentTime());
            delaySubscriptionObserver().subscribe(new Action1<Long>() {
                @Override
                public void call(Long aLong) {
                    log("delaySubscription:" + (getCurrentTime()+aLong));
                }
            });
        }
    订阅后的输出结果,使用delay操作符后 源Observable收到订阅请求后立刻将数据发送了出来,但是delay操作符会将数据截流下来,在2秒后才再发送出去;而delaySubscription会将订阅请求截留下来,在两秒后才订阅到源Observable上,然后源Observable立刻将数据发送了出去。
    结果如下
    start subscribe: 1556695248
    subscribe:1556695248
    start subscribe: 1556695248
    delay:2
    subscribe:1556695250
    delaySubscription:3113390500
    delaySubscription:3113390500
    

    3.6.2 do

    do操作符就是给Observable的生命周期的各个阶段加上一系列的回调监听,当Observable执行到这个阶段的时候,这些回调就会被触发。do操作符在RxJava中有一下多个实现:
    -doOnEach:Observable每发送一个数据的时候就会触发这个回调,无论Observable调用的是onNext,onError还是onCompleted
    -doOnNext:只有Observable调用onNext发送数据的时候才会被触发
    -doOnSubscribe和doOnUnsubscribe:会在Subscriber进行订阅和反订阅的时候触发回调。当一个Observable通过OnError或者OnCompleted结束的时候,会反订阅所有的Subscriber。
    -doOnError:会在Observable通过onError分发错误事件的时候触发回调, 并将Throwable对象作为参数传进回调函数里。
    -doOnComplete:会在Observable通过onCompleted发送结束事件的时候触发回调。
    -doOnTerminate:会在Observable结束前触发回调,无论是正常结束还是异常结束。
    -finallyDo:会在Observable结束后触发回调,无论是正常结束还是异常结束。

    /**
         * do操作符
         */
        private Observable<Integer> createObserver(){
            return Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    for(int i=1;i<=5;i++){
                        if(i<=3){
                            subscriber.onNext(i);
                        } else {
                            subscriber.onError(new Throwable("num > 3"));
                        }
                    }
                }
            });
        }
        private Observable<Integer> doOnEachObserver(){
            return Observable.just(1,2,3)
                    .doOnEach(new Action1<Notification<? super Integer>>() {
                        @Override
                        public void call(Notification<? super Integer> notification) {
                            log("doOnEach send " + notification.getValue() + " type:" + notification.getKind());
                        }
                    }).doOnNext(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            log("doOnNext send " + integer);
                        }
                    }).doOnSubscribe(new Action0() {
                        @Override
                        public void call() {
                            log("on Subscribe");
                        }
                    }).doOnUnsubscribe(new Action0() {
                        @Override
                        public void call() {
                            log("on unSubscribe");
                        }
                    }).doOnCompleted(new Action0() {
                        @Override
                        public void call() {
                            log("onCompleted");
                        }
                    });
    
        }
        private Observable<Integer> doOnErrorObserver(){
            return createObserver()
                    .doOnEach(new Action1<Notification<? super Integer>>() {
                        @Override
                        public void call(Notification<? super Integer> notification) {
                            log("doOnErrorEach send " + notification.getValue() + " type:" + notification.getKind());
                        }
                    }).doOnError(new Action1<Throwable>() {
                        @Override
                        public void call(Throwable throwable) {
                            log("onError: " + throwable.getMessage());
                        }
                    }).doOnTerminate(new Action0() {
                        @Override
                        public void call() {
                            log("OnTerminate");
                        }
                    }).doAfterTerminate(new Action0() {
                        @Override
                        public void call() {
                            log("doAfterTerminate");
                        }
                    });
        }
        private void doTest(){
            doOnEachObserver().subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    log("do: " + integer);
                }
            });
            doOnErrorObserver().subscribe(new Subscriber<Integer>() {
                @Override
                public void onCompleted() {
                    log("onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    log("subscriber onError: " + e.getMessage());
                }
    
                @Override
                public void onNext(Integer integer) {
                    log("subscriber onNext: " + integer);
                }
            });
        }
    结果
    on Subscribe
    doOnEach send 1 type:OnNext
    doOnNext send 1
    do: 1
    doOnEach send 2 type:OnNext
    doOnNext send 2
    do: 2
    doOnEach send 3 type:OnNext
    doOnNext send 3
    do: 3
    doOnEach send null type:OnCompleted
    onCompleted
    on unSubscribe
    doOnErrorEach send 1 type:OnNext
    subscriber onNext: 1
    doOnErrorEach send 2 type:OnNext
    subscriber onNext: 2
    doOnErrorEach send 3 type:OnNext
    subscriber onNext: 3
    doOnErrorEach send null type:OnError
    onError: num > 3
    OnTerminate
    subscriber onError: num > 3
    doAfterTerminate
    

    3.6.3 materialize 和dematerialize

    materialize操作符将OnNext,OnError和OnComplete事件都转化为一个Notification对象并按照原来的顺序发送出来。此外还有dematerialize操作符,它会执行相反的过程,即将这些Notification对象重新转化为对应的OnNext,OnError和OnComplete事件。

     /**
         * materizlize && dematerialize
         */
        private Observable<Notification<Integer>> materizlizeObserver(){
            return Observable.just(1,2,3).materialize();
        }
        private Observable<Integer> deMaterializeObserver(){
            return materizlizeObserver().dematerialize();
        }
        private void materizlize_dematerialize(){
            materizlizeObserver().subscribe(new Action1<Notification<Integer>>() {
                @Override
                public void call(Notification<Integer> integerNotification) {
                    log("materizlize: "+ integerNotification.getValue() + " type" + integerNotification.getKind());
                }
            });
            deMaterializeObserver().subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    log("deMeterizlize: "+ integer);
                }
            });
        }
    输出结果如下。可看到materialize将所有的数据和事件都封装成了Notification对象,我们可以通过getValue和getKind方法分别将封装的值和当前的类型取出来,而dematerialize操作符将封装的数据又还原成了原来的数据,就如同一个普通的Observable一样
    materizlize: 1 typeOnNext
    materizlize: 2 typeOnNext
    materizlize: 3 typeOnNext
    materizlize: null typeOnCompleted
    deMeterizlize: 1
    deMeterizlize: 2
    deMeterizlize: 3
    

    3.6.4 subscribeOn 和 observeOn

    subscribeOn用来指定Observable在哪个线程上运行, 我们可以指定它在IO线程上运行,也可以让其新开一个线程运行,当然也可以在当前线程上运行。一般会指定其在各种后台线程而不是主线程上运行,就如同AsyncTask的DoInBackground一样。

    observeOn用来指定观察者所运行的线程,也就是发送出数据在哪个线程上使用。

    subscribeOn就是在哪个线程上订阅, 订阅了Observable就开始干活了,所以subscribeOn指定的就是干活的线程;然后observeOn就是在哪个线程上观察,观察就看结果(数据,错误和结束事件等), 所以指定的就是结果被使用的线程。

    /**
         * subscribeOn && observeOn
         */
        private Observable<Integer> createObserver1(){
            return Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    log("on subscribe: " + Thread.currentThread().getName());
                    subscriber.onNext(1);
                    subscriber.onCompleted();
                }
            });
        }
        private Observable<Integer> observeOnObserver(){
            return createObserver1()
                    .observeOn(Schedulers.trampoline())
                    .subscribeOn(Schedulers.newThread());
        }
        private Observable<Integer> subscribeOnObserver() {
            return createObserver1()
                    .subscribeOn(Schedulers.computation())
                    .observeOn(Schedulers.immediate());
        }
        private void subscribeOn_observeOn(){
            observeOnObserver().subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    log("ObserveOn: "+ Thread.currentThread().getName());
                }
            });
            subscribeOnObserver().subscribe(new Action1<Integer>() {
                @Override
                public void call(Integer integer) {
                    log("subscribeOn: "+ Thread.currentThread().getName());
                }
            });
        }
    结果:
    on subscribe: RxNewThreadScheduler-1
    ObserveOn: RxNewThreadScheduler-1
    on subscribe: RxComputationScheduler-1
    subscribeOn: RxComputationScheduler-1
    

    3.6.5 timeInterval 和timeStamp

    timeInterval 会拦截源Observable发送出来的数据,将其封装为一个TimeInterval对象,TimeInterval对象内部包含Observable发送的原始数据,以及发送当前数据的发送上一个数据的时间间隔。对于第一个发送的数据,其时间间隔为订阅后到首次发送数据之间的时间间隔。

    timeStamp同样会将每个数据项重新包装成一个TimeStamp对象,TimeStamp对象内包含了原始数据和一个时间戳,这个时间戳标明了每次数据发送的时间。

    
        /**
         * timeInterval && timeStamp
         */
        private void timeInterval_TimeStamp(){
            Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline()).take(3).timeInterval()
                    .subscribe(new Action1<TimeInterval<Long>>() {
                        @Override
                        public void call(TimeInterval<Long> longTimeInterval) {
                            log("timeInterval: "+ longTimeInterval.getValue() + "-" + longTimeInterval.getIntervalInMilliseconds());
                        }
                    });
            Observable.interval(1, TimeUnit.SECONDS, Schedulers.trampoline()).take(3).timestamp()
                    .subscribe(new Action1<Timestamped<Long>>() {
                        @Override
                        public void call(Timestamped<Long> longTimestamped) {
                            log("timeStamp: "+ longTimestamped.getValue() + "-" + longTimestamped.getTimestampMillis());
                        }
                    });
        }
    

    订阅后的输出结果如下, 可以看到timeInterval操作符将数据封装成了timeInterval对象,可以通过getValue和getIntervalInMilliseconds方法获取到数据和间隔的时间,由于误差存在,间隔的时间在1000毫秒左右浮动;而TimeStamp将数据封装成了timeStamped的对象,可以通过getValue和getTimestampMillis方法获取到数据和封装进去的时间戳。

    timeInterval: 0-1011
    timeInterval: 1-998
    timeInterval: 2-998
    timeStamp: 0-1556694232493
    timeStamp: 1-1556694233492
    timeStamp: 2-1556694234493
    

    ***如上, interval这个操作符终于执行成功 ,原因是添加了指定线程Schedulers.trampoline()
    官网原因

    When you use the default scheduler (Schedulers.computation()) the observable emits on another thread. If your program exits just after the subscribe then the observable is not given a chance to run. Put in a long sleep just after the subscribe() call and you will see it working.

    3.6.6 timeout

    timeout操作符给Observable加上超时时间,每发送一个数据后就重置计时器,当超过预定的时间还没有发送下一个数据时,就抛出一个超时的异常。

    RxJava将timeout实现为很多不同功能的操作符。可以在超时的时候发出一个错误事件;也可以在超时的时候使用另一个Observable代替当前的Observable来继续发送数据。

    /**
         * timeout
         */
        private Observable<Integer> createTimeoutObserver(){
            return Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    for(int i=0; i<=3; i++){
                        try{
                            Thread.sleep(i*100);
                        } catch (InterruptedException e){
                            e.printStackTrace();
                        }
                        subscriber.onNext(i);
                    }
                    subscriber.onCompleted();
                }
            });
        }
        private void timeoutTest(){
            createTimeoutObserver().timeout(200, TimeUnit.MILLISECONDS)
                    .subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onCompleted() {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            log(e.toString());
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            log("timeout: "+ integer);
                        }
                    });
    
            createTimeoutObserver().timeout(200, TimeUnit.MILLISECONDS, Observable.just(5,6))
                    .subscribe(new Action1<Integer>() {
                        @Override
                        public void call(Integer integer) {
                            log(integer);
                        }
                    });
        }
    

    订阅结果如下,可看到第一个Observable发送两个数据后出现超时并发送出了错误事件;而第二个Observable在发送两个数据出现超时后,使用了另一个Observable来继续发送数据

    timeout: 0
    timeout: 1
    timeout: 2
    java.util.concurrent.TimeoutException
    0
    1
    2
    5
    6
    

    3.6.7 using

    using操作符可创建一个在Observable生命周期内存活的资源,也可以这样理解:我们创建一个资源并使用它,用一个Observable来限制这个资源的使用时间,当这个Observable终止的时候,这个资源就会被销毁。

    using需要使用3个参数,分别是:
    1.创建这个一次性资源的函数
    2.创建Observable的函数
    3.释放资源的函数

    定义一个Animal类, 在类的构造方法里我们使用interval操作符创建一个Observable对象并将其进行订阅,然后提供一个release方法来反订阅这个Observable。

    private class Animal{
            Subscriber subscriber = new Subscriber() {
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(Object o) {
                    log("animal eat");
                }
            };
            public Animal(){
                log("create animal");
                Observable.interval(1000, TimeUnit.MILLISECONDS).subscribe(subscriber);
            }
            public void release(){
                log("animal released");
                subscriber.unsubscribe();
            }
        }
    

    Animal类创建好了之后,使用using操作符创建一个Animal对象, 并使用timer操作符创建一个5秒控制Observable, 它在5秒后会调用Animal对象的release方法,也就是说这个Animal对象最多只能存在5秒,最后对其进行订阅。

    private Observable<Long> usingObservable(){
            return Observable.using(new Func0<Animal>() {
                @Override
                public Animal call() {
                    return new Animal();
                }
            }, new Func1<Animal, Observable<? extends Long>>() {
                @Override
                public Observable<? extends Long> call(Animal animal) {
                    return Observable.timer(5000, TimeUnit.MILLISECONDS, Schedulers.trampoline());
                }
            }, new Action1<Animal>() {
                @Override
                public void call(Animal animal) {
                    animal.release();
                }
            });
        }
        private void usingTest(){
            Subscriber subscriber = new Subscriber() {
                @Override
                public void onCompleted() {
                    log("onCompleted");
                }
    
                @Override
                public void onError(Throwable e) {
                    log("onError" + e.toString());
                }
    
                @Override
                public void onNext(Object o) {
                    log("onNext: " + o);
                }
            };
            usingObservable().subscribe(subscriber);
        }
    

    结果如下,可以看到我们创建一个Animal对象后,Animal内部会注册一个Observable对象, 并且每秒都会输出一个animal eat , 但是在5秒后会调用Animal的release方法, 从而对Animal内部的Observable对象进行反订阅。

    create animal
    animal eat
    animal eat
    animal eat
    animal eat
    animal eat
    onNext: 0
    onCompleted
    animal released
    

    相关文章

      网友评论

          本文标题:RxJava基础六-辅助操作符

          本文链接:https://www.haomeiwen.com/subject/gkinnqtx.html