美文网首页
实现简单的 RxKotlin (下)

实现简单的 RxKotlin (下)

作者: lguipeng | 来源:发表于2018-09-07 16:04 被阅读50次

    其他的一些操作符...

    doOnNext ( doOnError doOnCompleted)

    //Observable.kt
    fun doOnNext(action: (t: T) -> Unit): Observable<T> {
        return create(OnSubscribeDoOnNext(this, action))
    }
    
    //OnSubscribeDoOnNext.kt
    class OnSubscribeDoOnNext<T>(private var source: Observable<T>, private var action: (T) -> Unit) : Observable.OnSubscribe<T> {
    
        override fun call(subscriber: Subscriber<T>) {
            source.subscribe(object : Subscriber<T>(){
                override fun onCompleted() {
                    subscriber.onCompleted()
                }
    
                override fun onError(t: Throwable) {
                    subscriber.onError(t)
                }
    
                override fun onNext(t: T) {
                    action(t)
                    subscriber.onNext(t)
                }
            })
        }
    }
    

    compose

    fun <R> compose(transformer: (t : Observable<T>) -> Observable<R>): Observable<R> {
        return transformer(this)
    }
    

    from(array: Array<T>)

    //Observable.kt
    companion object {
    
        fun <T> from(array: Array<T>): Observable<T> {
            return create(OnSubscribeFromArray(array))
        }
    
    }
    
    //OnSubscribeFromArray
    class OnSubscribeFromArray<T>(private var array : Array<T>) : Observable.OnSubscribe<T> {
    
        override fun call(subscriber: Subscriber<T>) {
            try {
                for (t in array) {
                    if (!subscriber.isUnsubscribed())
                        subscriber.onNext(t)
                }
            }catch (e: Exception) {
                subscriber.onError(e)
            }
            subscriber.onCompleted()
        }
    
    }
    

    merge

    //Observable.kt
    companion object {
    
        fun <T> merge(source : Observable<Observable<T>>): Observable<T> {
            return source.lift(OperatorMerge())
        }
    
        fun <T> merge(array: Array<Observable<T>>): Observable<T> {
            return merge(from(array))
        }
    }
    
    //OperatorMerge.kt
    class OperatorMerge<T> : Observable.Operator<T, Observable<T>> {
    
        override fun call(subscriber: Subscriber<T>): Subscriber<Observable<T>> {
            return MergeSubscriber(subscriber)
        }
    
        private class MergeSubscriber<T>(private var actual : Subscriber<T>) : Subscriber<Observable<T>>() {
    
            override fun onCompleted() {
            }
    
            override fun onError(t: Throwable) {
            }
    
            override fun onNext(t: Observable<T>) {
                t.subscribe(actual)
            }
    
        }
    }
    

    flatMap

    fun <R> flatMap(func: (t: T) -> Observable<R>): Observable<R>{
        return merge(map(func))
    }
    

    相关文章

      网友评论

          本文标题:实现简单的 RxKotlin (下)

          本文链接:https://www.haomeiwen.com/subject/nlfnwftx.html