using
创建一个资源用于发射,当取消订阅时也释放资源。
public static <T, D> Observable<T> using(Callable<? extends D> resourceSupplier, Function<? super D, ? extends ObservableSource<? extends T>> sourceSupplier, Consumer<? super D> disposer, boolean eager)
第一个参数就是提供发射的资源,最后一个参数 eager,默认为 true。
lateinit var disposable : Disposable
// 第一个参数提供了一个集合资源
Observable.using({ listOf(1,2,3,4,5,6,7,8) }, { list ->
Observable.create(ObservableOnSubscribe<Int> {
for (i in list) {
it.onNext(i)
if (i == 7)
it.onComplete()
}
}) // 第二个参数是 Observable,发射资源
}, { Log.e("RX", "dispose $it,在这里释放资源")}) // 第三个参数用于最后释放第一个参数的资源
.subscribe(object: Observer<Int> {
override fun onComplete() { Log.e("RX", "onComplete") }
override fun onSubscribe(d: Disposable) { disposable = d}
override fun onNext(t: Int) {
Log.e("RX", "onNext $t")
if (t == 5) disposable?.dispose()
}
override fun onError(e: Throwable) { Log.e("RX", "onError") }
})
日志:
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
dispose [1, 2, 3, 4, 5, 6, 7, 8],在这里释放资源
上例中,Observer 在收到 5 这个整数时,就 dispose(),最后进了 using 第三个参数设置的 disposer 释放资源。
去掉调用 dispose(),这样发到 7 的时候,发射 onComplete,此时日志是
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onNext 6
onNext 7
dispose [1, 2, 3, 4, 5, 6, 7, 8],在这里释放资源
onComplete
给 using 方法加最后一个参数 false,日志如下,相比为 true 的就是等 onComplete 之后才去释放资源。
onNext 1
onNext 2
onNext 3
onNext 4
onNext 5
onNext 6
onNext 7
onComplete
dispose [1, 2, 3, 4, 5, 6, 7, 8],在这里释放资源
delay
public final Observable<T> delay(long delay, TimeUnit unit)
public final Observable<T> delay(long delay, TimeUnit unit, boolean delayError)
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)
public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler, boolean delayError)
public final <U> Observable<T> delay(final Function<? super T, ? extends ObservableSource<U>> itemDelay)
// 先调用 delaySubscription,然后调用上面的 delay 方法
public final <U, V> Observable<T> delay(ObservableSource<U> subscriptionDelay,
Function<? super T, ? extends ObservableSource<V>> itemDelay) {
return delaySubscription(subscriptionDelay).delay(itemDelay);
}
会延迟 onNext 和 onComplete,不会延迟 onError。就是整个发射事件被延迟了。
Observable.create(ObservableOnSubscribe<Int> {
it.onNext(1)
it.onNext(2)
it.onError(Throwable())
it.onNext(5)
})
.delay(5, TimeUnit.SECONDS, true)
.subscribe (object: Observer<Int> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: Int) { Log.e("RX", "$t")
}
override fun onError(e: Throwable) { Log.e("RX", "error")
}
})
会先收到 1,2 然后收到 error,如果 delay 的第三个参数为 false,直接收到 error。
Observable.just(1, 2, 3)
.delay(5, TimeUnit.SECONDS)
.subscribe { Log.e("RX", "$it") }
Function 方法返回的 Observable,onNext 或 onComplete 后外面的 Observable 才发射。
Log.e("RX", "start")
Observable.just(10).delay {
Thread.sleep(3000)
Observable.just("a")
}.subscribe( { Log.e("RX", "收到") } )
delaySubscription
public final <U> Observable<T> delaySubscription(ObservableSource<U> other)
// 先用 timer 封装一下,调用上面的方法
public final Observable<T> delaySubscription(long delay, TimeUnit unit, Scheduler scheduler) {
return delaySubscription(timer(delay, unit, scheduler));
}
// 调用上面三个参数的方法
public final Observable<T> delaySubscription(long delay, TimeUnit unit)
delay 是发射延迟,delaySubscription 发射正常,是观察者订阅延迟。
Observable.just(1, 2, 3)
.delaySubscription(5, TimeUnit.SECONDS)
.subscribe {
Log.e("RX", "$it")
}
第一个构造方法含义和 delay 那个类似,也是 Function 里的方法返回的那个 Observable 发射了,外面的订阅才开始。
materialize/dematerialize
materialize 将原来的 onNext,onError,onComplete 全都变成一个通知 Notification(RX 框架里的类),然后都通过 onNext 发射出去。
val materializeObservable = Observable.just(1,2,3)
.materialize()
materializeObservable.subscribe( {
Log.e("RX", "value=${it.value},complete=${it.isOnComplete}")
})
日志:
value=1,complete=false
value=2,complete=false
value=3,complete=false
value=null,complete=true
dematerialize 是 materialize 的逆过程,将 onNext 发射的 Notification 变成原来的样子。
materializeObservable.dematerialize<Int>()
.subscribe(object : Observer<Int> {
override fun onComplete() { Log.e("RX", "onComplete")}
override fun onSubscribe(d: Disposable) {}
override fun onNext(t: Int) { Log.e("RX", "onNext=$t") }
override fun onError(e: Throwable) {}
})
onNext=1
onNext=2
onNext=3
onComplete
doXXX
包括:
- doOnSubscribe
- doOnNext
- doAfterNext
- doOnTerminate
- doAfterTerminate
- doOnError
- doOnComplete
- doOnDispose
- doFinally
- doOnLifecycle
- doOnEach
lateinit var disposable: Disposable
// 封装了 doXXX 方法
val transformer = ObservableTransformer<Int, Int> {
it.doOnNext { Log.e("RX", "doOnNext $it") } // Observable 调 onNext 后执行
.doAfterNext { Log.e("RX", "doAfterNext $it") } // Observer 接收到 onNext 后执行
.doOnComplete { Log.e("RX", "doOnComplete") } // Observable 调 onComplete 后执行
.doOnError { Log.e("RX", "doOnError") } // Observable 调 doOnError 后执行
.doOnTerminate { Log.e("RX", "doOnTerminate") } // Observable 调 onComplete 或 onError 后执行
.doAfterTerminate { Log.e("RX", "doAfterTerminate") } // Observer 接收到 onComplete 或 onError 后执行
.doOnDispose { Log.e("RX", "doOnDispose") } // Observer 调用 dispose() 后执行
.doFinally { Log.e("RX", "doFinally") } // Observable 调 onComplete 或 onError 后,或者 Observer 调用 dispose()
.doOnEach {
// Observable 调用任何方法都会收到一个通知
val str = if (it.isOnComplete) "onComplete" else (if (it.isOnError) "onError" else "${it.value}")
Log.e("RX", "doOnEach Notification $str")
}
.doOnEach(object : Observer<Int> { // 调用任何方法都会用一个 Observer 来收到这个事件
override fun onComplete() { Log.e("RX", "doOnEach Observer onComplete") }
override fun onSubscribe(d: Disposable) { Log.e("RX", "doOnEach Observer onSubscribe") }
override fun onNext(t: Int) { Log.e("RX", "doOnEach Observer onNext $t") }
override fun onError(e: Throwable) { Log.e("RX", "doOnEach Observer onError") }
})
.doOnLifecycle(Consumer<Disposable> { Log.e("RX", "doOnLifecycle onSubscribe") }, // Observable 被订阅后执行
Action { Log.e("RX", "doOnLifecycle onDispose") }) // Observable 被取消订阅后执行
.doOnSubscribe { Log.e("RX", "doOnSubscribe") } // Observable 被订阅后执行
}
// 发射 onComplete
val observable1 = Observable.create(ObservableOnSubscribe<Int> {
Log.e("RX", "before emit 1")
it.onNext(1)
Log.e("RX", "after emit 1, before emit 2")
it.onNext(2)
Log.e("RX", "after emit 2, before emit complete")
it.onComplete()
Log.e("RX", "after emit complete")
})
// 发射 onError
val observable2 = Observable.create(ObservableOnSubscribe<Int> {
Log.e("RX", "before emit 1")
it.onNext(1)
Log.e("RX", "after emit 1, before emit 2")
it.onNext(2)
Log.e("RX", "after emit 2, before emit error")
it.onError(Throwable())
Log.e("RX", "after emit error")
})
val observer1 = object : Observer<Int> { // 正常的一个 Observer
override fun onComplete() { Log.e("RX", "Observer onComplete") }
override fun onSubscribe(d: Disposable) { Log.e("RX", "Observer onSubscribe") }
override fun onNext(t: Int) { Log.e("RX", "Observer onNext $t") }
override fun onError(e: Throwable) { Log.e("RX", "Observer onError") }
}
val observer2 = object : Observer<Int> { // 取消订阅的一个 Observer
override fun onComplete() { Log.e("RX", "Observer onComplete") }
override fun onSubscribe(d: Disposable) {
disposable = d
Log.e("RX", "Observer onSubscribe")
}
override fun onNext(t: Int) { Log.e("RX", "Observer onNext $t") }
override fun onError(e: Throwable) {
disposable.dispose()
Log.e("RX", "Observer onError and dispose")
}
}
observable1.compose(transformer).subscribe(observer1)
日志
05-16 11:29:42.568 18180-18180/pot.ner347.androiddemo E/RX: doOnLifecycle onSubscribe
doOnSubscribe
Observer onSubscribe
before emit 1
05-16 11:29:42.569 18180-18180/pot.ner347.androiddemo E/RX: doOnNext 1
05-16 11:29:42.570 18180-18180/pot.ner347.androiddemo E/RX: doOnEach Notification 1
doOnEach Observer onNext 1
Observer onNext 1
doAfterNext 1
after emit 1, before emit 2
doOnNext 2
doOnEach Notification 2
doOnEach Observer onNext 2
Observer onNext 2
05-16 11:29:42.571 18180-18180/pot.ner347.androiddemo E/RX: doAfterNext 2
after emit 2, before emit complete
doOnComplete
doOnTerminate
doOnEach Notification onComplete
doOnEach Observer onComplete
Observer onComplete
doFinally
doAfterTerminate
after emit complete
observable2.compose(transformer).subscribe(observer1)
日志
05-16 11:32:23.905 19052-19052/pot.ner347.androiddemo E/RX: doOnLifecycle onSubscribe
doOnSubscribe
Observer onSubscribe
before emit 1
doOnNext 1
05-16 11:32:23.907 19052-19052/pot.ner347.androiddemo E/RX: doOnEach Notification 1
doOnEach Observer onNext 1
Observer onNext 1
doAfterNext 1
after emit 1, before emit 2
doOnNext 2
doOnEach Notification 2
doOnEach Observer onNext 2
Observer onNext 2
doAfterNext 2
after emit 2, before emit error
doOnError
05-16 11:32:23.908 19052-19052/pot.ner347.androiddemo E/RX: doOnTerminate
doOnEach Notification onError
doOnEach Observer onError
Observer onError
doFinally
doAfterTerminate
after emit error
observable2.compose(transformer).subscribe(observer2)
日志
05-16 11:42:35.507 20216-20216/pot.ner347.androiddemo E/RX: doOnLifecycle onSubscribe
doOnSubscribe
05-16 11:42:35.508 20216-20216/pot.ner347.androiddemo E/RX: Observer onSubscribe
before emit 1
doOnNext 1
05-16 11:42:35.509 20216-20216/pot.ner347.androiddemo E/RX: doOnEach Notification 1
doOnEach Observer onNext 1
Observer onNext 1
doAfterNext 1
after emit 1, before emit 2
doOnNext 2
doOnEach Notification 2
05-16 11:42:35.509 20216-20216/pot.ner347.androiddemo E/RX: doOnEach Observer onNext 2
Observer onNext 2
doAfterNext 2
after emit 2, before emit error
doOnError
doOnTerminate
05-16 11:42:35.510 20216-20216/pot.ner347.androiddemo E/RX: doOnEach Notification onError
doOnEach Observer onError
doOnLifecycle onDispose
doOnDispose
doFinally
Observer onError and dispose
doAfterTerminate
after emit error
onTerminateDetach
在执行 dispose() 解除订阅时,将内部对外部观察者的引用 actual 置为 null,看网上文章主要用于防止内存泄漏问题,因为 RxJava 使用中用了许多匿名内部类。比如这篇文章:一张图搞定-RxJava2的线程切换原理和内存泄露问题
serialize
Observable 可以异步调用观察者的方法,可能是从不同的线程调用。这可能会让 Observable 行为不正确,它可能会在某一个 onNext 调用之前尝试调用 onCompleted 或 onError 方法,或者从两个不同的线程同时调用 onNext 方法。
使用 serialize 可以纠正 Observable 的行为,保证它的行为是正确的且是同步的。
subscribe/subscribeWith
订阅,主要是有几个重载方法。
// 用的是一些默认的实现
public final Disposable subscribe() {
return subscribe(Functions.emptyConsumer(), Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer<? super T> onNext)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe)
public final void subscribe(Observer<? super T> observer)
// 订阅后返回这个观察者对象
public final <E extends Observer<? super T>> E subscribeWith(E observer) {
subscribe(observer);
return observer;
}
timeInterval
public final Observable<Timed<T>> timeInterval() {
return timeInterval(TimeUnit.MILLISECONDS, Schedulers.computation());
}
public final Observable<Timed<T>> timeInterval(Scheduler scheduler) {
return timeInterval(TimeUnit.MILLISECONDS, scheduler);
}
public final Observable<Timed<T>> timeInterval(TimeUnit unit) {
return timeInterval(unit, Schedulers.computation());
}
public final Observable<Timed<T>> timeInterval(TimeUnit unit, Scheduler scheduler)
拦截源 Observable 发射的数据项,算出两个连续发射数据之间的时间间隔,将这个间隔和原始数据封装成 Timed 发射出来。
新的 Observable 的第一个发射数据是在 Observer 订阅源Observable 到源 Observable 发射第一项数据之间的时间长度。源 Observable 发射最后一项数据到发射 onComplete 之间的时间间隔不会发射。
Observable.create(ObservableOnSubscribe<Int> {
it.onNext(1)
Thread.sleep(100)
it.onNext(2)
Thread.sleep(200)
it.onNext(3)
Thread.sleep(150)
it.onNext(4)
Thread.sleep(250)
it.onComplete()
}).timeInterval().subscribe(object: Observer<Timed<Int>> {
override fun onComplete() { Log.e("RX", "onComplete") }
override fun onSubscribe(d: Disposable) {}
override fun onNext(t: Timed<Int>) {
t.time()
Log.e("RX", "onNext ${t.time()},${t.value()}")
}
override fun onError(e: Throwable) {}
})
日志
onNext 0,1
onNext 101,2
onNext 200,3
onNext 151,4
onComplete
timestamp
timeInterval 是将时间间隔和源数据封装,而 timestamp 是将发射时的时间戳和源数据封装。
Observable.create(ObservableOnSubscribe<Int> {
it.onNext(1)
Thread.sleep(100)
it.onNext(2)
Thread.sleep(200)
it.onNext(3)
Thread.sleep(150)
it.onNext(4)
Thread.sleep(250)
it.onComplete()
}).timestamp().subscribe(object: Observer<Timed<Int>> {
override fun onComplete() { Log.e("RX", "onComplete") }
override fun onSubscribe(d: Disposable) {}
override fun onNext(t: Timed<Int>) {
t.time()
Log.e("RX", "onNext ${t.time()},${t.value()}")
}
override fun onError(e: Throwable) {}
})
日志
onNext 1526714465003,1
onNext 1526714465104,2
onNext 1526714465305,3
onNext 1526714465457,4
onComplete
timeout
// Function 里返回的 Observable 结束之前,源 Observable 还没发射数据的话就超时
public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator)
public final <V> Observable<T> timeout(Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator, ObservableSource<? extends T> other)
// 超时进 onError
public final Observable<T> timeout(long timeout, TimeUnit timeUnit)
// 超时使用备用的 Observable 发射
// 如果没超时,先发源 Observable,再发备用的 Observable
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, ObservableSource<? extends T> other)
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler, ObservableSource<? extends T> other)
public final Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler)
public final <U, V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator, Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator)
public final <U, V> Observable<T> timeout(ObservableSource<U> firstTimeoutIndicator, Function<? super T, ? extends ObservableSource<V>> itemTimeoutIndicator, ObservableSource<? extends T> other)
val ob = Observable.interval(10, 10, TimeUnit.MILLISECONDS).take(3)
val observer = object : Observer<Long> {
override fun onComplete() { Log.e("RX", "onComplete") }
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: Long) {Log.e("RX", "onNext $t") }
override fun onError(e: Throwable) {Log.e("RX", "onError") }
}
val other = Observable.just(20L,30L)
// 超时
// onError
ob.timeout(5, TimeUnit.MILLISECONDS).subscribe(observer)
// 超时
// onNext 20
// onNext 30
// onComplete
ob.timeout(5, TimeUnit.MILLISECONDS, other).subscribe(observer)
// Function 返回的 Observable 在 2ms 后发个 0 就结束了,此时源 Observable 还没发射,于是超时,但是第一个数据发出来了
// onNext 0
// onError
ob.timeout({Observable.timer(2, TimeUnit.MILLISECONDS)}).subscribe(observer)
// 控制第一个数据也有超时限制
// onError
ob.timeout(Observable.timer(5, TimeUnit.MILLISECONDS),
Function<Long, ObservableSource<Long>>{Observable.timer(5, TimeUnit.MILLISECONDS)})
.subscribe(observer)
网友评论