美文网首页
实现简单的 RxKotlin (下)

实现简单的 RxKotlin (下)

作者: lguipeng | 来源:发表于2018-09-07 16:04 被阅读50次

其他的一些操作符...

doOnNext ( doOnError doOnCompleted)

//Observable.kt
fun doOnNext(action: (t: T) -> Unit): Observable<T> {
    return create(OnSubscribeDoOnNext(this, action))
}

//OnSubscribeDoOnNext.kt
class OnSubscribeDoOnNext<T>(private var source: Observable<T>, private var action: (T) -> Unit) : Observable.OnSubscribe<T> {

    override fun call(subscriber: Subscriber<T>) {
        source.subscribe(object : Subscriber<T>(){
            override fun onCompleted() {
                subscriber.onCompleted()
            }

            override fun onError(t: Throwable) {
                subscriber.onError(t)
            }

            override fun onNext(t: T) {
                action(t)
                subscriber.onNext(t)
            }
        })
    }
}

compose

fun <R> compose(transformer: (t : Observable<T>) -> Observable<R>): Observable<R> {
    return transformer(this)
}

from(array: Array<T>)

//Observable.kt
companion object {

    fun <T> from(array: Array<T>): Observable<T> {
        return create(OnSubscribeFromArray(array))
    }

}

//OnSubscribeFromArray
class OnSubscribeFromArray<T>(private var array : Array<T>) : Observable.OnSubscribe<T> {

    override fun call(subscriber: Subscriber<T>) {
        try {
            for (t in array) {
                if (!subscriber.isUnsubscribed())
                    subscriber.onNext(t)
            }
        }catch (e: Exception) {
            subscriber.onError(e)
        }
        subscriber.onCompleted()
    }

}

merge

//Observable.kt
companion object {

    fun <T> merge(source : Observable<Observable<T>>): Observable<T> {
        return source.lift(OperatorMerge())
    }

    fun <T> merge(array: Array<Observable<T>>): Observable<T> {
        return merge(from(array))
    }
}

//OperatorMerge.kt
class OperatorMerge<T> : Observable.Operator<T, Observable<T>> {

    override fun call(subscriber: Subscriber<T>): Subscriber<Observable<T>> {
        return MergeSubscriber(subscriber)
    }

    private class MergeSubscriber<T>(private var actual : Subscriber<T>) : Subscriber<Observable<T>>() {

        override fun onCompleted() {
        }

        override fun onError(t: Throwable) {
        }

        override fun onNext(t: Observable<T>) {
            t.subscribe(actual)
        }

    }
}

flatMap

fun <R> flatMap(func: (t: T) -> Observable<R>): Observable<R>{
    return merge(map(func))
}

相关文章

网友评论

      本文标题:实现简单的 RxKotlin (下)

      本文链接:https://www.haomeiwen.com/subject/nlfnwftx.html