美文网首页
RxJava 辅助操作符

RxJava 辅助操作符

作者: 三流之路 | 来源:发表于2018-06-09 22:11 被阅读0次

    ReactiveX 系列文章目录


    using

    创建一个资源用于发射,当取消订阅时也释放资源。

    public static <T, D> Observable<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier, Consumer<? super D> disposer, boolean eager)
    

    第一个参数就是提供发射的资源,最后一个参数 eager,默认为 true。

    lateinit var disposable : Disposable
    // 第一个参数提供了一个集合资源
    Observable.using({ listOf(1,2,3,4,5,6,7,8) }, { list ->
        Observable.create(ObservableOnSubscribe<Int> {
            for (i in list) {
                it.onNext(i)
                if (i == 7)
                    it.onComplete()
            }
        }) // 第二个参数是 Observable,发射资源
    }, { Log.e("RX", "dispose $it,在这里释放资源")}) // 第三个参数用于最后释放第一个参数的资源
            .subscribe(object: Observer<Int> {
                override fun onComplete() { Log.e("RX", "onComplete") }
    
                override fun onSubscribe(d: Disposable) { disposable = d}
    
                override fun onNext(t: Int) {
                    Log.e("RX", "onNext $t")
                    if (t == 5) disposable?.dispose()
                }
    
                override fun onError(e: Throwable) { Log.e("RX", "onError") }
            })
    

    日志:

    onNext 1
    onNext 2
    onNext 3
    onNext 4
    onNext 5
    dispose [1, 2, 3, 4, 5, 6, 7, 8],在这里释放资源
    

    上例中,Observer 在收到 5 这个整数时,就 dispose(),最后进了 using 第三个参数设置的 disposer 释放资源。

    去掉调用 dispose(),这样发到 7 的时候,发射 onComplete,此时日志是

    onNext 1
    onNext 2
    onNext 3
    onNext 4
    onNext 5
    onNext 6
    onNext 7
    dispose [1, 2, 3, 4, 5, 6, 7, 8],在这里释放资源
    onComplete
    

    给 using 方法加最后一个参数 false,日志如下,相比为 true 的就是等 onComplete 之后才去释放资源。

    onNext 1
    onNext 2
    onNext 3
    onNext 4
    onNext 5
    onNext 6
    onNext 7
    onComplete
    dispose [1, 2, 3, 4, 5, 6, 7, 8],在这里释放资源
    

    delay

    public final Observable<T> delay(long delay, TimeUnit unit)
    public final Observable<T> delay(long delay, TimeUnit unit, boolean delayError)
    public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)
    public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler, boolean delayError)
    
    
    public final <U> Observable<T> delay(final Function<? super T, ? extends ObservableSource<U>> itemDelay)
    // 先调用 delaySubscription,然后调用上面的 delay 方法
    public final <U, V> Observable<T> delay(ObservableSource<U> subscriptionDelay,
           Function<? super T, ? extends ObservableSource<V>> itemDelay) {
       return delaySubscription(subscriptionDelay).delay(itemDelay);
    }
    

    会延迟 onNext 和 onComplete,不会延迟 onError。就是整个发射事件被延迟了。

    Observable.create(ObservableOnSubscribe<Int> {
       it.onNext(1)
       it.onNext(2)
       it.onError(Throwable())
       it.onNext(5)
    })
    .delay(5, TimeUnit.SECONDS, true)
    .subscribe (object: Observer<Int> {
       override fun onComplete() {
       }
    
       override fun onSubscribe(d: Disposable) {
       }
    
       override fun onNext(t: Int) { Log.e("RX", "$t")
       }
    
       override fun onError(e: Throwable) { Log.e("RX", "error")
       }
    
    })
    

    会先收到 1,2 然后收到 error,如果 delay 的第三个参数为 false,直接收到 error。

    Observable.just(1, 2, 3)
        .delay(5, TimeUnit.SECONDS)
        .subscribe { Log.e("RX", "$it") }
    

    Function 方法返回的 Observable,onNext 或 onComplete 后外面的 Observable 才发射。

    Log.e("RX", "start")
    Observable.just(10).delay {
        Thread.sleep(3000)
        Observable.just("a")
    }.subscribe( { Log.e("RX", "收到") } )
    

    delaySubscription

    public final <U> Observable<T> delaySubscription(ObservableSource<U> other)
    // 先用 timer 封装一下,调用上面的方法
    public final Observable<T> delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
        return delaySubscription(timer(delay, unit, scheduler));
    }
    // 调用上面三个参数的方法
    public final Observable<T> delaySubscription(long delay, TimeUnit unit)
    

    delay 是发射延迟,delaySubscription 发射正常,是观察者订阅延迟。

    Observable.just(1, 2, 3)
           .delaySubscription(5, TimeUnit.SECONDS)
           .subscribe {
               Log.e("RX", "$it")
           }
    

    第一个构造方法含义和 delay 那个类似,也是 Function 里的方法返回的那个 Observable 发射了,外面的订阅才开始。

    materialize/dematerialize

    materialize 将原来的 onNext,onError,onComplete 全都变成一个通知 Notification(RX 框架里的类),然后都通过 onNext 发射出去。

    val materializeObservable = Observable.just(1,2,3)
                        .materialize()
    
    materializeObservable.subscribe( {
        Log.e("RX", "value=${it.value},complete=${it.isOnComplete}")
    })
    

    日志:

    value=1,complete=false
    value=2,complete=false
    value=3,complete=false
    value=null,complete=true
    

    dematerialize 是 materialize 的逆过程,将 onNext 发射的 Notification 变成原来的样子。

    materializeObservable.dematerialize<Int>()
    .subscribe(object : Observer<Int> {
       override fun onComplete() { Log.e("RX", "onComplete")}
       override fun onSubscribe(d: Disposable) {}
       override fun onNext(t: Int) { Log.e("RX", "onNext=$t") }
       override fun onError(e: Throwable) {}
    })
    
    onNext=1
    onNext=2
    onNext=3
    onComplete
    

    doXXX

    包括:

    • doOnSubscribe
    • doOnNext
    • doAfterNext
    • doOnTerminate
    • doAfterTerminate
    • doOnError
    • doOnComplete
    • doOnDispose
    • doFinally
    • doOnLifecycle
    • doOnEach
    lateinit var disposable: Disposable
    
    // 封装了 doXXX 方法
    val transformer = ObservableTransformer<Int, Int> {
        it.doOnNext { Log.e("RX", "doOnNext $it") } // Observable 调 onNext 后执行
                .doAfterNext { Log.e("RX", "doAfterNext $it") } // Observer 接收到 onNext 后执行
                .doOnComplete { Log.e("RX", "doOnComplete") } // Observable 调 onComplete 后执行
                .doOnError { Log.e("RX", "doOnError") } // Observable 调 doOnError 后执行
                .doOnTerminate { Log.e("RX", "doOnTerminate") } //  Observable 调 onComplete 或 onError 后执行
                .doAfterTerminate { Log.e("RX", "doAfterTerminate") } // Observer 接收到 onComplete 或 onError 后执行
                .doOnDispose { Log.e("RX", "doOnDispose") } // Observer 调用 dispose() 后执行
                .doFinally { Log.e("RX", "doFinally") } // Observable 调 onComplete 或 onError 后,或者 Observer 调用 dispose()
                .doOnEach {
                    // Observable 调用任何方法都会收到一个通知
                    val str = if (it.isOnComplete) "onComplete" else (if (it.isOnError) "onError" else "${it.value}")
                    Log.e("RX", "doOnEach Notification $str")
                }
                .doOnEach(object : Observer<Int> { // 调用任何方法都会用一个 Observer 来收到这个事件
                    override fun onComplete() { Log.e("RX", "doOnEach Observer onComplete") }
                    override fun onSubscribe(d: Disposable) { Log.e("RX", "doOnEach Observer onSubscribe") }
                    override fun onNext(t: Int) { Log.e("RX", "doOnEach Observer onNext $t") }
                    override fun onError(e: Throwable) { Log.e("RX", "doOnEach Observer onError") }
                })
                .doOnLifecycle(Consumer<Disposable> { Log.e("RX", "doOnLifecycle onSubscribe") }, // Observable 被订阅后执行
                        Action { Log.e("RX", "doOnLifecycle onDispose") }) // Observable 被取消订阅后执行
                .doOnSubscribe { Log.e("RX", "doOnSubscribe") } // Observable 被订阅后执行
    }
    
    // 发射 onComplete
    val observable1 = Observable.create(ObservableOnSubscribe<Int> {
        Log.e("RX", "before emit 1")
        it.onNext(1)
        Log.e("RX", "after emit 1, before emit 2")
        it.onNext(2)
        Log.e("RX", "after emit 2, before emit complete")
        it.onComplete()
        Log.e("RX", "after emit complete")
    })
    
    // 发射 onError
    val observable2 = Observable.create(ObservableOnSubscribe<Int> {
        Log.e("RX", "before emit 1")
        it.onNext(1)
        Log.e("RX", "after emit 1, before emit 2")
        it.onNext(2)
        Log.e("RX", "after emit 2, before emit error")
        it.onError(Throwable())
        Log.e("RX", "after emit error")
    })
    
    val observer1 = object : Observer<Int> { // 正常的一个 Observer
        override fun onComplete() { Log.e("RX", "Observer onComplete") }
        override fun onSubscribe(d: Disposable) { Log.e("RX", "Observer onSubscribe") }
        override fun onNext(t: Int) { Log.e("RX", "Observer onNext $t") }
        override fun onError(e: Throwable) { Log.e("RX", "Observer onError") }
    }
    
    val observer2 = object : Observer<Int> { // 取消订阅的一个 Observer
        override fun onComplete() { Log.e("RX", "Observer onComplete") }
        override fun onSubscribe(d: Disposable) {
            disposable = d
            Log.e("RX", "Observer onSubscribe")
        }
        override fun onNext(t: Int) { Log.e("RX", "Observer onNext $t") }
        override fun onError(e: Throwable) {
            disposable.dispose()
            Log.e("RX", "Observer onError and dispose")
        }
    }
    

    observable1.compose(transformer).subscribe(observer1)
    

    日志

    05-16 11:29:42.568 18180-18180/pot.ner347.androiddemo E/RX: doOnLifecycle onSubscribe
        doOnSubscribe
        Observer onSubscribe
        before emit 1
    05-16 11:29:42.569 18180-18180/pot.ner347.androiddemo E/RX: doOnNext 1
    05-16 11:29:42.570 18180-18180/pot.ner347.androiddemo E/RX: doOnEach Notification 1
        doOnEach Observer onNext 1
        Observer onNext 1
        doAfterNext 1
        after emit 1, before emit 2
        doOnNext 2
        doOnEach Notification 2
        doOnEach Observer onNext 2
        Observer onNext 2
    05-16 11:29:42.571 18180-18180/pot.ner347.androiddemo E/RX: doAfterNext 2
        after emit 2, before emit complete
        doOnComplete
        doOnTerminate
        doOnEach Notification onComplete
        doOnEach Observer onComplete
        Observer onComplete
        doFinally
        doAfterTerminate
        after emit complete
    

    observable2.compose(transformer).subscribe(observer1)
    

    日志

    05-16 11:32:23.905 19052-19052/pot.ner347.androiddemo E/RX: doOnLifecycle onSubscribe
        doOnSubscribe
        Observer onSubscribe
        before emit 1
        doOnNext 1
    05-16 11:32:23.907 19052-19052/pot.ner347.androiddemo E/RX: doOnEach Notification 1
        doOnEach Observer onNext 1
        Observer onNext 1
        doAfterNext 1
        after emit 1, before emit 2
        doOnNext 2
        doOnEach Notification 2
        doOnEach Observer onNext 2
        Observer onNext 2
        doAfterNext 2
        after emit 2, before emit error
        doOnError
    05-16 11:32:23.908 19052-19052/pot.ner347.androiddemo E/RX: doOnTerminate
        doOnEach Notification onError
        doOnEach Observer onError
        Observer onError
        doFinally
        doAfterTerminate
        after emit error
    

    observable2.compose(transformer).subscribe(observer2)
    

    日志

    05-16 11:42:35.507 20216-20216/pot.ner347.androiddemo E/RX: doOnLifecycle onSubscribe
        doOnSubscribe
    05-16 11:42:35.508 20216-20216/pot.ner347.androiddemo E/RX: Observer onSubscribe
        before emit 1
        doOnNext 1
    05-16 11:42:35.509 20216-20216/pot.ner347.androiddemo E/RX: doOnEach Notification 1
        doOnEach Observer onNext 1
        Observer onNext 1
        doAfterNext 1
        after emit 1, before emit 2
        doOnNext 2
        doOnEach Notification 2
    05-16 11:42:35.509 20216-20216/pot.ner347.androiddemo E/RX: doOnEach Observer onNext 2
        Observer onNext 2
        doAfterNext 2
        after emit 2, before emit error
        doOnError
        doOnTerminate
    05-16 11:42:35.510 20216-20216/pot.ner347.androiddemo E/RX: doOnEach Notification onError
        doOnEach Observer onError
        doOnLifecycle onDispose
        doOnDispose
        doFinally
        Observer onError and dispose
        doAfterTerminate
        after emit error
    

    onTerminateDetach

    在执行 dispose() 解除订阅时,将内部对外部观察者的引用 actual 置为 null,看网上文章主要用于防止内存泄漏问题,因为 RxJava 使用中用了许多匿名内部类。比如这篇文章:一张图搞定-RxJava2的线程切换原理和内存泄露问题

    serialize

    Observable 可以异步调用观察者的方法,可能是从不同的线程调用。这可能会让 Observable 行为不正确,它可能会在某一个 onNext 调用之前尝试调用 onCompleted 或 onError 方法,或者从两个不同的线程同时调用 onNext 方法。

    使用 serialize 可以纠正 Observable 的行为,保证它的行为是正确的且是同步的。

    subscribe/subscribeWith

    订阅,主要是有几个重载方法。

    // 用的是一些默认的实现
    public final Disposable subscribe() {
        return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }
    public final Disposable subscribe(Consumer<? super T> onNext)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                Action onComplete)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                Action onComplete, Consumer<? super Disposable> onSubscribe)
    public final void subscribe(Observer<? super T> observer)
    
    // 订阅后返回这个观察者对象
    public final <E extends Observer<? super T>> E subscribeWith(E observer) {
        subscribe(observer);
        return observer;
    }
    

    timeInterval

    public final Observable<Timed<T>> timeInterval() {
        return timeInterval(TimeUnit.MILLISECONDS, Schedulers.computation());
    }
    public final Observable<Timed<T>> timeInterval(Scheduler scheduler) {
        return timeInterval(TimeUnit.MILLISECONDS, scheduler);
    }
    public final Observable<Timed<T>> timeInterval(TimeUnit unit) {
        return timeInterval(unit, Schedulers.computation());
    }
    
    public final Observable<Timed<T>> timeInterval(TimeUnit unit, Scheduler scheduler)
    

    拦截源 Observable 发射的数据项,算出两个连续发射数据之间的时间间隔,将这个间隔和原始数据封装成 Timed 发射出来。

    新的 Observable 的第一个发射数据是在 Observer 订阅源Observable 到源 Observable 发射第一项数据之间的时间长度。源 Observable 发射最后一项数据到发射 onComplete 之间的时间间隔不会发射。

    Observable.create(ObservableOnSubscribe<Int> {
        it.onNext(1)
        Thread.sleep(100)
        it.onNext(2)
        Thread.sleep(200)
        it.onNext(3)
        Thread.sleep(150)
        it.onNext(4)
        Thread.sleep(250)
        it.onComplete()
    }).timeInterval().subscribe(object: Observer<Timed<Int>> {
        override fun onComplete() { Log.e("RX", "onComplete") }
    
        override fun onSubscribe(d: Disposable) {}
    
        override fun onNext(t: Timed<Int>) {
            t.time()
            Log.e("RX", "onNext ${t.time()},${t.value()}")
        }
    
        override fun onError(e: Throwable) {}
    })
    

    日志

    onNext 0,1
    onNext 101,2
    onNext 200,3
    onNext 151,4
    onComplete
    

    timestamp

    timeInterval 是将时间间隔和源数据封装,而 timestamp 是将发射时的时间戳和源数据封装。

    Observable.create(ObservableOnSubscribe<Int> {
        it.onNext(1)
        Thread.sleep(100)
        it.onNext(2)
        Thread.sleep(200)
        it.onNext(3)
        Thread.sleep(150)
        it.onNext(4)
        Thread.sleep(250)
        it.onComplete()
    }).timestamp().subscribe(object: Observer<Timed<Int>> {
        override fun onComplete() { Log.e("RX", "onComplete") }
    
        override fun onSubscribe(d: Disposable) {}
    
        override fun onNext(t: Timed<Int>) {
            t.time()
            Log.e("RX", "onNext ${t.time()},${t.value()}")
        }
    
        override fun onError(e: Throwable) {}
    
    })
    

    日志

    onNext 1526714465003,1
    onNext 1526714465104,2
    onNext 1526714465305,3
    onNext 1526714465457,4
    onComplete
    

    timeout

    // Function 里返回的 Observable 结束之前,源 Observable 还没发射数据的话就超时
    public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator)
    public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator, ObservableSource<? extends T> other)
    
    // 超时进 onError
    public final Observable<T> timeout(long timeout, TimeUnit timeUnit) 
    // 超时使用备用的 Observable 发射
    // 如果没超时,先发源 Observable,再发备用的 Observable
    public final Observable<T> timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other) 
    public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, ObservableSource<? extends T> other) 
    public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler)
    
    public final <U, V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator, Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator)
    
    public final <U, V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator, Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator, ObservableSource<? extends T> other)
    
    val ob = Observable.interval(10, 10, TimeUnit.MILLISECONDS).take(3)
    val observer = object : Observer<Long> {
        override fun onComplete() { Log.e("RX", "onComplete") }
    
        override fun onSubscribe(d: Disposable) {
        }
    
        override fun onNext(t: Long) {Log.e("RX", "onNext $t") }
    
        override fun onError(e: Throwable) {Log.e("RX", "onError") }
    }
    val other = Observable.just(20L,30L)
    
    // 超时
    // onError
    ob.timeout(5, TimeUnit.MILLISECONDS).subscribe(observer)
    
    // 超时
    // onNext 20
    // onNext 30
    // onComplete
    ob.timeout(5, TimeUnit.MILLISECONDS, other).subscribe(observer)
    
    // Function 返回的 Observable 在 2ms 后发个 0 就结束了,此时源 Observable 还没发射,于是超时,但是第一个数据发出来了
    // onNext 0
    // onError
    ob.timeout({Observable.timer(2, TimeUnit.MILLISECONDS)}).subscribe(observer)
    
    // 控制第一个数据也有超时限制
    // onError
    ob.timeout(Observable.timer(5, TimeUnit.MILLISECONDS),
                        Function<Long, ObservableSource<Long>>{Observable.timer(5, TimeUnit.MILLISECONDS)})
                        .subscribe(observer)
    

    相关文章

      网友评论

          本文标题:RxJava 辅助操作符

          本文链接:https://www.haomeiwen.com/subject/zcuxeftx.html