其他的一些操作符...
doOnNext ( doOnError doOnCompleted)
//Observable.kt
fun doOnNext(action: (t: T) -> Unit): Observable<T> {
return create(OnSubscribeDoOnNext(this, action))
}
//OnSubscribeDoOnNext.kt
class OnSubscribeDoOnNext<T>(private var source: Observable<T>, private var action: (T) -> Unit) : Observable.OnSubscribe<T> {
override fun call(subscriber: Subscriber<T>) {
source.subscribe(object : Subscriber<T>(){
override fun onCompleted() {
subscriber.onCompleted()
}
override fun onError(t: Throwable) {
subscriber.onError(t)
}
override fun onNext(t: T) {
action(t)
subscriber.onNext(t)
}
})
}
}
compose
fun <R> compose(transformer: (t : Observable<T>) -> Observable<R>): Observable<R> {
return transformer(this)
}
from(array: Array<T>)
//Observable.kt
companion object {
fun <T> from(array: Array<T>): Observable<T> {
return create(OnSubscribeFromArray(array))
}
}
//OnSubscribeFromArray
class OnSubscribeFromArray<T>(private var array : Array<T>) : Observable.OnSubscribe<T> {
override fun call(subscriber: Subscriber<T>) {
try {
for (t in array) {
if (!subscriber.isUnsubscribed())
subscriber.onNext(t)
}
}catch (e: Exception) {
subscriber.onError(e)
}
subscriber.onCompleted()
}
}
merge
//Observable.kt
companion object {
fun <T> merge(source : Observable<Observable<T>>): Observable<T> {
return source.lift(OperatorMerge())
}
fun <T> merge(array: Array<Observable<T>>): Observable<T> {
return merge(from(array))
}
}
//OperatorMerge.kt
class OperatorMerge<T> : Observable.Operator<T, Observable<T>> {
override fun call(subscriber: Subscriber<T>): Subscriber<Observable<T>> {
return MergeSubscriber(subscriber)
}
private class MergeSubscriber<T>(private var actual : Subscriber<T>) : Subscriber<Observable<T>>() {
override fun onCompleted() {
}
override fun onError(t: Throwable) {
}
override fun onNext(t: Observable<T>) {
t.subscribe(actual)
}
}
}
flatMap
fun <R> flatMap(func: (t: T) -> Observable<R>): Observable<R>{
return merge(map(func))
}
网友评论