美文网首页
RxJava2.0 操作符(6)—— Utility 辅助操作符

RxJava2.0 操作符(6)—— Utility 辅助操作符

作者: DoubleThunder | 来源:发表于2017-10-16 23:10 被阅读532次

    这个页面展示的操作符可用于组合多个 Observables。

    Delay — 延时发射 Observable 的结果。
    DelaySubscription — 延时处理订阅请求。
    DoOnEach — 注册一个动作,对 Observable 发射的每个数据项使用。
    DoOnComplete — 注册一个动作,对正常完成的 Observable 使用。
    DoOnError — 注册一个动作,对发生错误的 Observable 使用。
    DoOnTerminate — 注册一个动作,对完成的 Observable 使用,无论是否发生错误。
    DoOnSubscribe — 注册一个动作,在观察者订阅时使用。
    DoOnUnsubscribe — 注册一个动作,在观察者取消订阅时使用。
    Dematerialize — 将上面的结果逆转回一个 Observable
    ObserveOn — 指定观察者观察 Observable 的调度器
    Materialize — 将 Observable 转换成一个通知列表
    Serialize — 强制一个 Observable 连续调用并保证行为正确
    Subscribe — 操作来自 Observable 的发射物和通知。
    SubscribeOn — 指定 Observable 执行任务的调度器。
    TimeInterval — 定期发射数据。
    Timeout - 对原始 Observable 的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知。
    Timestamp — 给 Observable 发射的每个数据项添加一个时间戳。

    6.1 Delay

    延迟一段指定的时间再发射来自 Observable 的请求。

    DelayDelay

    RxJava 的实现是 delay 和 delaySubscription。不同之处在于 Delay 是延时数据的发射,而 DelaySubscription 是延时注册 Subscriber。

    6.1.1 Delay

    delaydelay

    示例代码:

    final long currentTimeMillis = System.currentTimeMillis();
    Observable.range(1, 2).delay(2000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            if (integer == 1) {
                Log.e(TAG, "delay Time :" + (System.currentTimeMillis() - currentTimeMillis) + "");
            }
            Log.e(TAG, "accept:" + integer);
        }
    });
    

    输出结果:

    delay Time :2408
    accept:1
    accept:2
    

    6.1.2 delaySubscription

    delaySubscriptiondelaySubscription

    示例代码:

    final long currentTimeMillis = System.currentTimeMillis();
    Observable.range(1, 2).delaySubscription(2, TimeUnit.SECONDS).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer aLong) throws Exception {
            if (aLong == 1) {
                Log.e(TAG, "delay Time :" + (System.currentTimeMillis() - currentTimeMillis) + "");
            }
            Log.e(TAG, "accept:" + aLong);
        }
    });
    

    输出结果:

    delay Time :2500
    accept:1
    accept:2
    

    6.2 Do

    注册一个动作作为原始 Observable 生命周期事件的一种占位符。

    DoDo

    Do 操作符就是给 Observable 的生命周期的各个阶段加上一系列的回调监听,当 Observable 执行到这个阶段的时候,这些回调就会被触发。
    在 Rxjava2.0 中实现了很多的 do 操作符的变体。

    6.2.1 doAfterNext

    实现方法:doAfterNext(Consumer)
    从上流向下流发射后被调用。

    示例代码:

    public static void demo_doAfterNext(){
        Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        });
        ob1.doAfterNext(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e(TAG,"doAfterNext="+integer);
            }
        }).subscribe(getNormalObserver());
    }
    
    
    public static Disposable mDisposable ;
    //可重复使用
    public static Observer<Integer> getNormalObserver(){
        return new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                mDisposable = d;
            }
    
            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e(TAG,"normal,onNext:"+integer);
            }
    
            @Override
            public void onError(@NonNull Throwable error) {
                Log.e(TAG,"normal,Error: " + error.getMessage());
            }
    
            @Override
            public void onComplete() {
                Log.e(TAG,"normal,onComplete");
            }
        };
    }
    

    输出结果:

    normal,onNext:1
    doAfterNext : 1
    normal,onNext:2
    doAfterNext : 2
    normal,onNext:3
    doAfterNext : 3
    normal,onComplete
    
    

    6.2.2 doAfterTerminate

    doAfterTerminatedoAfterTerminate

    实现方法: doAfterTerminate(Action)

    注册一个 Action,当 Observable 调用 onComplete 或 onError 触发。

    示例代码:

    Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onComplete();
    //                emitter.onError(new Throwable("nothingerro"));
        }
    });
    ob1.doAfterTerminate(new Action() {
        @Override
        public void run() throws Exception {
            Log.e(TAG,"doAfterTerminate run");
        }
    }).subscribe(getNormalObserver());
    

    输出结果:

    normal,onNext:1
    normal,onNext:2
    normal,onComplete
    doAfterTerminate run
    

    6.2.3 doFinally

    实现方法: doFinally(Action onDispose)

    当 Observable 调用 onError 或 onCompleted 之后调用指定的操作,或由下游处理。
    doFinally 优先于 doAfterTerminate 的调用。

    示例代码:

    Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onComplete();
    //      emitter.onError(new Throwable("nothingerro"));
        }
    });
    ob1.doFinally(new Action() {
        @Override
        public void run() throws Exception {
            Log.e(TAG,"doFinally run");
        }
    }).subscribe(getNormalObserver());
    

    输出结果:

    normal,onNext:1
    normal,onNext:2
    normal,onComplete
    doFinally run
    
    

    6.2.4 doOnDispose

    doOnDisposedoOnDispose

    实现方法:doOnDispose(Action onDispose)

    当 Observable 取消订阅时,它就会被调用。

    示例代码:

    Observable<Integer> ob1 =  Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
    
            emitter.onNext(1);
            //mDisposable 参考6.2.1
            if (mDisposable != null) {
                mDisposable.dispose();
            }
            emitter.onNext(2);
            emitter.onComplete();
    //                emitter.onError(new Throwable("nothingerro"));
        }
    });
    ob1.doOnDispose(new Action() {
        @Override
        public void run() throws Exception {
            Log.e(TAG,"doOnDispose run");
        }
    }).subscribe(getNormalObserver());
    

    输出结果:

    normal,onNext:1
    doOnDispose run
    
    

    6.2.5 doOnComplete

    doOnCompletedoOnComplete

    当它产生的 Observable 正常终止调用 onCompleted 时会被调用。
    Javadoc: doOnCompleted(Action)

    示例代码:

    Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
    //                emitter.onError(new Throwable("nothingerror"));
            emitter.onComplete();
    
        }
    });
    ob1.doOnComplete(new Action() {
        @Override
        public void run() throws Exception {
            Log.e(TAG, "doOnComplete run");
        }
    }).subscribe(getNormalObserver());
    

    输出结果:

    normal,onNext:1
    normal,onNext:2
    doOnComplete run
    normal,onComplete
    

    6.2.6 doOnEach

    doOnEachdoOnEach

    doOnEach 操作符让你可以注册一个回调,它产生的 Observable 每发射一项数据就会调用它一次。不仅包括 onNext 还包括 onError 和 onCompleted。

    示例代码:

     Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
    //                emitter.onError(new Throwable("nothingerror"));
            emitter.onComplete();
    
        }
    });
    ob1.doOnEach(new Observer<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
    
        }
    
        @Override
        public void onNext(@NonNull Integer integer) {
            Log.e(TAG, "doOnEach,onNext:" + integer);
        }
    
        @Override
        public void onError(@NonNull Throwable e) {
            Log.e(TAG, "doOnEach,onError:" + e.getMessage());
        }
    
        @Override
        public void onComplete() {
            Log.e(TAG, "doOnEach,onComplete");
        }
    }).subscribe(getNormalObserver());
    

    输出结果:

    doOnEach,onNext:1
    normal,onNext:1
    doOnEach,onNext:2
    normal,onNext:2
    doOnEach,onComplete
    normal,onComplete
    

    6.2.7 doOnError

    doOnError 操作符注册一个动作,当它产生的 Observable 异常终止调用 onError 时会被调用。

    doOnErrordoOnError

    实现方法 doOnError(Consumer<? super Throwable>);

    示例代码:

    Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onError(new Throwable("nothing error"));
            emitter.onComplete();
    
        }
    });
    ob1.doOnError(new Consumer<Throwable>() {
        @Override
        public void accept(@NonNull Throwable throwable) throws Exception {
            Log.e(TAG,"doOnError : "+throwable.getMessage());
        }
    }).subscribe(getNormalObserver());
    

    输出结果:

    normal,onNext:1
    normal,onNext:2
    doOnError : nothing error
    normal,Error: nothing error
    

    6.2.8 doOnLifecycle

    调用相应的 onXXX 方法(在所有 Observer 之间共享),用于序列的生命周期事件(订阅,取消,请求)。


    doOnLifecycledoOnLifecycle

    示例代码:

    Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
    //                emitter.onError(new Throwable("nothing error"));
            if (mDisposable != null) {
                mDisposable.dispose();
            }
            emitter.onComplete();
    
        }
    });
    ob1.doOnLifecycle(new Consumer<Disposable>() {
        @Override
        public void accept(@NonNull Disposable disposable) throws Exception {
            Log.e(TAG, "doOnLifecycle ,disposable:" + disposable);
        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            Log.e(TAG, "doOnLifecycle ,run");
        }
    }).subscribe(getNormalObserver());
    

    输出结果:

    doOnLifecycle ,disposable:null
    normal,onNext:1
    normal,onNext:2
    doOnLifecycle ,run
    

    6.2.9 doOnNext

    doOnNext操作符类似于 doOnEach(Consumer)。

    doOnNextdoOnNext

    示例代码:

    Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
    //      emitter.onError(new Throwable("nothing error"));
            emitter.onComplete();
        }
    });
    ob1.doOnNext(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer integer) throws Exception {
        Log.e(TAG, "doOnNext ,onNext:"+integer);
    }
    }).subscribe(getNormalObserver());
    

    输出结果:

    doOnNext ,onNext:1
    normal,onNext:1
    doOnNext ,onNext:2
    normal,onNext:2
    normal,onComplete
    

    6.2.10 doOnSubscribe

    doOnSubscribe,当观察者订阅它生成的 Observable 它就会被调用。

    doOnSubscribedoOnSubscribe

    实践:在 Observable 发射前做一些初始化操作(比如开始加载数据时显示载入中界面)。

    示例代码:

    Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
    //      emitter.onError(new Throwable("nothing error"));
            emitter.onComplete();
    
        }
    });
    ob1.doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(@NonNull Disposable disposable) throws Exception {
            Log.e(TAG, "doOnSubscribe,disposable:" + disposable);
        }
    }).subscribe(getNormalObserver());
    

    输出结果:

    doOnSubscribe,disposable:null
    normal,onNext:1
    normal,onNext:2
    normal,onComplete
    

    6.2.11 doOnTerminate

    doOnTerminate 操作符注册一个动作,当它产生的 Observable 终止之前会被调用,无论是正常还是异常终止。

    doOnTerminatedoOnTerminate

    实现方法:doOnTerminate(Action)
    实践:不管消息流最终以 onError() / onComplete() 结束,都会被调用(类似 Java 的 finally ),对于某些需要 onError() / onComplete() 后都要执行的操作(如网络加载成功/失败都要隐藏载入中界面),可以放在这里。

    注意:取消订阅时,不会调用 doOnTerminate 方法。
    

    示例代码:

    Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
    //      emitter.onError(new Throwable("nothing error"));
            emitter.onComplete();
    
        }
    });
    ob1.doOnTerminate(new Action() {
        @Override
        public void run() throws Exception {
            Log.e(TAG,"doOnTerminate,run");
        }
    }).subscribe(getNormalObserver());
    

    输出结果:

    normal,onNext:1
    normal,onNext:2
    doOnTerminate,run
    normal,onComplete
    

    6.2.12 onTerminateDetach

    当执行了反注册 unsubscribes 或者发送数据序列中断了,解除上游生产者对下游接受者的引用。
    实践:onTerminateDetach 会使 Observable 调用 UnSubscriber 时,对 Subscriber 的引用会被释放,从而避免造成内存泄漏。

    6.3 Meterialize / Dematerialize

    6.3.1 Materialize

    Materialize 将数据项和事件通知都当做数据项发射,Dematerialize 刚好相反。


    MeterializeMeterialize

    一个合法的有限的 Obversable 将调用它的观察者的 onNext 方法零次或多次,然后调用观察者的 onCompleted 或 onError 仅一次。Materialize 操作符将这一系列调用,包括原来的 onNext 通知和终止通知 onCompleted 或 onError 都转换为一个 Observable 发射的数据序列。
    通俗一点的说法:Meterialize 操作符将 OnNext / OnError / OnComplet e都转化为一个 Notification 对象并按照原来的顺序发射出来。

    示例代码:

    Observable<Integer> ob1 = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
    //      emitter.onError(new Throwable("love world"));
            emitter.onComplete();
        }
    });
    
    ob1.materialize().subscribe(new Consumer<Notification<Integer>>() {
        @Override
        public void accept(@NonNull Notification<Integer> in) throws Exception {
            if (in.isOnNext()) {
                Log.e(TAG, "materialize,onNext: " + in.isOnNext());
                return;
            }
            if (in.isOnError()) {
                Log.e(TAG, "materialize,onError: "+in.getError().getMessage());
                return;
            }
            if (in.isOnComplete()) {
                Log.e(TAG, "materialize,OnComplete");
                return;
            }
        }
    });
    

    输出结果:

    materialize,onNext: true
    materialize,onNext: true
    materialize,OnComplete
    

    6.3.2 Dematerialize

    而 Dematerialize 执行相反的过程。


    DematerializeDematerialize

    示例代码:

    Observable<Notification<Integer>> ob1 = Observable.create(new ObservableOnSubscribe<Notification<Integer>>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Notification<Integer>> e) throws Exception {
            e.onNext(Notification.createOnNext(1));
            e.onNext(Notification.<Integer>createOnError(new Throwable("My error!")));
            e.onNext(Notification.<Integer>createOnComplete());
            
        }
    });
    ob1.dematerialize().subscribe(new Observer<Object>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
    
        }
    
        @Override
        public void onNext(@NonNull Object o) {
            Log.e(TAG, "onNext:" + o.toString());
        }
    
        @Override
        public void onError(@NonNull Throwable e) {
            Log.e(TAG, "onError:" + e.getMessage());
        }
    
        @Override
        public void onComplete() {
            Log.e(TAG, "onComplete");
        }
    });
    

    输出结果:

    onNext:1
    onComplete
    

    6.4 ObserveOn / SubscribeOn

    指定一个观察者在哪个调度器(线程)上观察这个 Observable。


    ObserveOnObserveOn
    SubscribeOnSubscribeOn

    ObserverOn 用来指定观察者所运行的线程,也就是发射出的数据在那个线程上使用。
    在 Android 中,如果经常会遇见这样场景,我们需要从网络中读取数据,之后修改 UI 界面,观察者就必须在主线程上运行,就如同 AsyncTask 的 onPostExecute。

    .subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程  
    .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程  
    

    注意:当遇到一个异常时 ObserveOn 会立即向前传递这个 onError 终止通知,它不会等待慢速消费的 Observable 接受任何之前它已经收到但还没有发射的数据项。这可能意味着 onError 通知会跳到(并吞掉)原始 Observable 发射的数据项前面,正如下图所示的。


    ObserveOnObserveOn

    示例代码:

    /**
     Schedulers.io() 代表 io 操作的线程, 通常用于网络,读写文件等 io 密集型的操作
     Schedulers.computation() 代表 CPU 计算密集型的操作, 例如需要大量计算的操作
     Schedulers.newThread() 代表一个常规的新线程
     */
    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
            Log.e(TAG, "subscribeOn:" + Thread.currentThread().getName());
            emitter.onNext(1);
        }
    }).subscribeOn(Schedulers.io())
            .observeOn(Schedulers.newThread())
            .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(@NonNull Integer integer) throws Exception {
            Log.e(TAG, "observerOn:" + Thread.currentThread().getName());
            Log.e(TAG, "onNext:" +integer);
        }
    });
    

    输出结果:

    subscribeOn:RxCachedThreadScheduler-1
    observerOn:RxNewThreadScheduler-1
    onNext:1
    

    6.4.1 unsubscribeOn

    修改原 Observable,以便订阅者将其配置在指定的调度器(线程)上。

    示例代码:

    //将线程从 computation 换到 io 中
    Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
        
                Log.e(TAG, "subscribeOn:" + Thread.currentThread().getName());
                emitter.onNext(1);
            }
        }).subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.computation())
        .doOnNext(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e(TAG, "doOnNext,observerOn:" + Thread.currentThread().getName());
                Log.e(TAG, "doOnNext,onNext:" + integer);
            }
        })
        .unsubscribeOn(Schedulers.computation())
        .subscribeOn(Schedulers.io())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e(TAG, "observerOn:" + Thread.currentThread().getName());
                Log.e(TAG, "onNext:" + integer);
            }
        });
    

    输出结果:

    subscribeOn:RxNewThreadScheduler-1
    doOnNext,observerOn:RxComputationThreadPool-1
    doOnNext,onNext:1
    observerOn:RxComputationThreadPool-1
    onNext:1
    

    6.5 Serialize

    强制一个 Observable 连续调用并保证行为正确。

    SerializeSerialize

    一个 Observable 可以异步调用它的观察者的方法,可能是从不同的线程调用。这可能会让 Observable 行为不正确,它可能会在某一个 onNext 调用之前尝试调用 onCompleted 或 onError 方法,或者从两个不同的线程同时调用 onNext 方法。使用 serialize 操作符,你可以纠正这个 Observable 的行为,保证它的行为是正确的且是同步的。

    6.6 TimeInterval

    将一个发射数据的 Observable 转换为发射那些数据发射时间间隔的 Observable。

    TimeIntervalTimeInterval

    TimeInterval 操作符拦截原始 Observable 发射的数据项,替换为两个连续发射物之间流逝的时间长度。 也就是说这个使用这个操作符后发射的不再是原始数据,而是原始数据发射的时间间隔。新的 Observable 的第一个发射物表示的是在观察者订阅原始 Observable 到原始 Observable 发射它的第一项数据之间流逝的时间长度。 不存在与原始 Observable 发射最后一项数据和发射 onCompleted 通知之间时长对应的发射物。timeInterval 默认在 immediate 调度器上执行,你可以通过传参数修改。

    示例代码:

    Observable.interval(100, TimeUnit.MILLISECONDS)
        .take(3)
        .timeInterval()
        .subscribe(new Consumer<Timed<Long>>() {
            @Override
            public void accept(@NonNull Timed<Long> t) throws Exception {
                Log.e(TAG, "onNext: " + t.value() + " , time = " + t.time());
            }
        });
    

    输出结果:

    onNext: 0 , time = 104
    onNext: 1 , time = 113
    onNext: 2 , time = 100
    

    6.7 Timeout

    对原始 Observable 的一个镜像,如果过了一个指定的时长仍没有发射数据,它会发一个错误通知。


    TimeoutTimeout

    Timeout 操作符给 Observable 加上超时时间,每发射一个数据后就重置计时器,当超过预定的时间还没有发射下一个数据,就抛出一个超时的异常。
    RxJava2.0 中的实现的 Timeout 操作符有好几个变体:

    • timeout(long,TimeUnit): 第一个变体接受一个时长参数,每当原始 Observable 发射了一项数据,timeout就启动一个计时器,如果计时器超过了指定指定的时长而原始 Observable 没有发射另一项数据,timeout 就抛出 TimeoutException,以一个错误通知终止 Observable。 这个timeout默认在computation调度器上执行,你可以通过参数指定其它的调度器。
    • timeout(long,TimeUnit,Observable): 这个版本的 timeout 在超时时会切换到使用一个你指定的备用的 Observable,而不是发错误通知。它也默认在 computation 调度器上执行。
    • timeout(Function):这个版本的 timeout 使用一个函数针对原始 Observable 的每一项返回一个 Observable,如果当这个 Observable 终止时原始 Observable 还没有发射另一项数据,就会认为是超时了,timeout 就抛出 TimeoutException,以一个错误通知终止 Observable。
    • timeout(Function,Observable): 这个版本的 timeout 同时指定超时时长和备用的 Observable。它默认在immediate调度器上执行

    示例代码1:

    /**
     * 在 150 毫秒间隔内如果没有发射数据。发送一个 TimeoutException 通知终止。
     * */
    Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 5; i++) {
                    Thread.sleep(i * 100);
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        })
        .timeout(150, TimeUnit.MILLISECONDS)
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                
            }
        
            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e(TAG, "onNext:" + integer);
            }
        
            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG, "onError:" + e.getMessage());
            }
        
            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete");
            }
        });
    

    输出结果:

    onNext:0
    onNext:1
    onError:null
    

    示例代码 2:

     /**
     * 只接收 200 毫秒间隔内发送的数据,如果超时则切换到 Observable.just(100, 200)
     * */
    Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 5; i++) {
                    Thread.sleep(i * 100);
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }
        })
        .timeout(200, TimeUnit.MILLISECONDS, Observable.just(100, 200))
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e(TAG, "accept:" + integer);
            }
        });
    

    输出结果:

    accept:0
    accept:1
    accept:100
    accept:200
    

    6.8 Timestamp

    给 Observable 发射的数据项附加一个时间戳。


    TimestampTimestamp

    它将一个发射 T 类型数据的 Observable 转换为一个发射类型为 Timestamped 的数据的 Observable,每一项都包含数据的发射时间。也就是把 Observable 发射的数据重新包装了一下,将数据发射的时间打包一起发射出去,这样观察者不仅能得到数据,还能得到数据的发射时间。 timestamp 默认在 immediate 调度器上执行,但是可以通过参数指定其它的调度器。

    示例代码:

    Observable.range(1, 3)
        .timestamp()
        .subscribe(new Consumer<Timed<Integer>>() {
            @Override
            public void accept(@NonNull Timed<Integer> t) throws Exception {
                Log.e(TAG, "accept ,onNext:" + t.value() + ",time = " + t.time());
            }
        });
    

    输出结果:

    accept ,onNext:1,time = 1494606809418
    accept ,onNext:2,time = 1494606809420
    accept ,onNext:3,time = 1494606809420
    

    6.9 Using

    创建一个只在 Observable 生命周期内存在的一次性资源.


    UsingUsing

    当一个观察者订阅 using 返回的 Observable 时,using 将会使用 Observable 工厂函数创建观察者要观察 Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个 Observable 时,或者当观察者终止时(无论是正常终止还是因错误而终止),using 使用第三个函数释放它创建的资源。

    using 操作符接受三个参数:

    • 一个用户创建一次性资源的工厂函数
    • 一个用于创建 Observable 的工厂函数
    • 一个用于释放资源的函数

    示例代码:

    Observable.using(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                return new Random().nextInt(10);
            }
        }, new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
                return Observable.just("hello+" + integer, "world+" + integer);
            }
        }, new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer integer) throws Exception {
                Log.e(TAG, "using,accept - >" + integer);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e(TAG, "subscribe,accept -> " + s);
            }
        });
    

    输出结果:

    subscribe,accept -> hello+8
    subscribe,accept -> world+8
    using,accept - >8
    

    6.10 To

    将 Observable 转换为另一个对象或数据结构。


    ToTo

    ReactiveX 的很多语言特定实现都有一种操作符让你可以将 Observable 或者 Observable 发射的数据序列转换为另一个对象或数据结构。它们中的一些会阻塞直到 Observable 终止,然后生成一个等价的对象或数据结构;另一些返回一个发射那个对象或数据结构的 Observable。

    在某些 ReactiveX 实现中,还有一个操作符用于将 Observable 转换成阻塞式的。一个阻塞式的 Ogbservable 在普通的 Observable 的基础上增加了几个方法,用于操作 Observable 发射的数据项。

    RxJava2.x 中实现了多种 To 操作符:

    6.10.1 To

    示例代码:

    
    

    输出结果:

    
    

    6.10.2 toFuture

    返回表示该 Observable 发出的单个值的 Future。
    如果 Observable 发出多个项目,Future 将会收到一个 IllegalArgumentException。 如果 Observable 为空,Future 将收到一个 NoSuchElementException。

    如果 Observable 可能会发出多个项目,请使用Observable.toList() 、toBlocking() 、toFuture()。


    toFuturetoFuture

    相关文章

      网友评论

          本文标题:RxJava2.0 操作符(6)—— Utility 辅助操作符

          本文链接:https://www.haomeiwen.com/subject/dcqruxtx.html