上一篇我们实现了,观察订阅及发布功能。本篇实现操作符Map。
- 创建一个用于数据转换接口
interface Function<T, U> {
//外部实现数据的转换逻辑
fun apply(t: T): U
}
- 基于装饰者模式,我们抽象出观察者及被观察者装饰者接口
//注意这里持有了一个ObservableSource被观察者对象
abstract class AbstractObservableWithUpStream<T, U>(var observableSource: ObservableSource<T>) :
Observable<U>() {
}
//同样持有了一个Observer观察者
open class BasicFuseabObserver<T, U>(var oberver: Observer<U>) : Observer<T> {
override fun onNext(t: T) {}
override fun onSubscribe() {}
override fun onComplete() {}
override fun onError(t: Throwable) {}
}
- 实现装饰者类,传入了被装饰者对象
//被观察者
class ObservableMap<T, U>(observableSource: ObservableSource<T>, var mapper: Function<T, U>) :
AbstractObservableWithUpStream<T, U>(observableSource) {
override fun subscribeActual(observer: Observer<U>) {
//调用持有的被观察者的subscribe,同时创建了装饰后的观察者对象
observableSource.subscribe(MapObserver(observer, mapper))
}
//观察者
class MapObserver<T, U>(observer: Observer<U>, var mapper: Function<T, U>) :
BasicFuseabObserver<T, U>(observer) {
override fun onNext(t: T) {
//执行装饰逻辑及原被装饰者逻辑
val apply = mapper.apply(t)
oberver.onNext(apply)
}
}
}
- 下面我们在Observable里实现map操作符
fun <U> map(mapper: Function<T, U>): Observable<U> {
//注意这里传入的this对象,就是上游的Observable对象
// 传给下游,利用装饰者模式,mapper进行数据转换
return ObservableMap(this, mapper)
}
Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: Emitter<String>) {
emitter.onNext("HELLO")
}
}).map(object : Function<String, Int> {
override fun apply(t: String): Int {
return 1000
}
}).subscribe(object : Observer<Int> {
override fun onSubscribe() {
TODO("Not yet implemented")
}
override fun onNext(t: Int) {
TODO("Not yet implemented")
}
override fun onError(t: Throwable) {
TODO("Not yet implemented")
}
override fun onComplete() {
TODO("Not yet implemented")
}
})
这样我们就实现了数据的转换功能。同时我们可以发现,如果要实现新的操作符,只需要创建对应的观察者和被观察者的对应装饰类即可。
网友评论