- 首先创建原始被观察者及观察者接口
//被观察者
interface ObservableSource<T> {
//订阅
fun subscribe(observer: Observer<T>)
}
//观察者
interface Observer<T> {
//订阅
fun onSubscribe()
//事件发送
fun onNext(t: T)
//错误
fun onError(t:Throwable)
//事件完成
fun onComplete()
}
- 创建抽象的被观察者
abstract class Observable<T> : ObservableSource<T> {
override fun subscribe(observer: Observer<T>) {
subscribeActual(observer);
}
//抽象订阅方法,这里会传入观察者的对象,交给数据发送者
protected abstract fun subscribeActual(observer: Observer<T>)
}
- 创建数据发送者接口
interface Emitter<T> {
fun onNext(t: T)
fun onError(t:Throwable)
fun onComplete()
}
- 创建数据发送者与被观察者建立联系的接口
interface ObservableOnSubscribe<T> {
//被订阅者在此实现方法里发送数据
fun subscribe(emitter: Emitter<T>)
}
- 创建被观察者实现类及数据发送者实现类
//这里构造方法传入了数据发送者与被观察者建立联系具体实现
class ObservableCreate<T>(var observableOnSubscribe: ObservableOnSubscribe<T>) : Observable<T>() {
//被观察者订阅观察者时,具体实现数据发送
override fun subscribeActual(observer: Observer<T>) {
val emitterCreate = EmitterCreate(observer)
observableOnSubscribe.subscribe(emitterCreate)
//触发了onSubscribe
observer.onSubscribe()
}
//构造方法传入了观察者
class EmitterCreate<T>(private val observer: Observer<T>) : Emitter<T> {
override fun onNext(t: T) {
//观察者接收到被观察者通过Emitter发送的数据
observer.onNext(t)
}
override fun onError(t: Throwable) {
observer.onError(t)
}
override fun onComplete() {
observer.onComplete()
}
}
}
这样我们就完整创建出了:观察者、被观察者、数据发送者、数据发送者与被观察者联系接口。
我们在Observable里实现create(source: ObservableOnSubscribe<T>)创建一个ObservableCreate实例,并传入一个ObservableOnSubscribe用作数据发送。
companion object {
fun <T> create(source: ObservableOnSubscribe<T>): Observable<T> {
return ObservableCreate(source)
}
}
Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: Emitter<String>) {
emitter.onNext("HELLO")
}
}).subscribe(object : Observer<String> {
override fun onSubscribe() {
TODO("Not yet implemented")
}
override fun onNext(t: String) {
TODO("Not yet implemented")
}
override fun onError(t: Throwable) {
TODO("Not yet implemented")
}
override fun onComplete() {
TODO("Not yet implemented")
}
})
以上我们通过观察者模式就简易实现了,观察者及被观察者数据发送的建立联系。
后续将继续通过装饰者模式实现map操作符及线程切换等操作,进一步理解Rxjava。
网友评论