RxSwift的核心流程从源码分析原理,分为三步
1.创建序列 -- create
2.订阅序列 -- subscribe
3.发送信号 -- onXX
//1.创建序列
let ob = Observable<Any>.create { (observer) -> Disposable in
//3.发送信号
observer.onNext("发送信号")
observer.onError(NSError.init(domain: "EpisodeError", code: 10086, userInfo: nil))
return Disposables.create()
}
//2.订阅信号
let _ = ob.subscribe(onNext: { (text) in
print("订阅到:\(text)")
}, onError: { (error) in
print("error:\(error)")
}, onCompleted: {
print("完成")
}) {
print("销毁")
}
那么问题来了,我们发送的信号,是如何被订阅到的呢?
从代码分析我们可以看出
1.我们发送信号的时候创建了闭包1
2.我们订阅的时候,会subscribe
闭包2--onXX
3.闭包2的onXX
会回掉闭包1的onXX
,将闭包1的值传过来
底层又是如何实现这三步曲的呢,那么我们从源码一一剖析
1.创建序列 -- create
继承关系为:
AnonymousObservable
-> Producer
-> Observable
-> ObservableType
-> ObservableConvertibleType
注:
ObservableType
中定义了subscribe
方法,供接受协议的Observable
及其子类重载
-create
方法的时候创建了一个内部对象 AnonymousObservable
-AnonymousObservable
保存了外界的subscribeHandler
闭包
2.订阅序列 -- subscribe
订阅序列subscribe
的源码在ObservableType
的扩展中
extension ObservableType {
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
...
let observer = AnonymousObserver<Element> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
-Element
是 Swift
的关联类型,这里的这个Element
就是我们的序列类型
-创建了一个 AnonymousObserver
(匿名内部观察者) ,它这里的初始化是闭包参数,保存了外界的 onNext
, onError
, onCompleted
, onDisposed
的处理回调闭包的调用
这里面还有一个很重要的起到调度作用的sink
3.发送信号 -- onXX
发送信号的源码存在ObserverType
扩展中
可以看出
onXX
分别调用了self.on
方法,而这个self.on
方法又是ObserverType
协议的协议方法总结:RxSwift的结构
1:序列 - 编码统一 ,随时随地享用
2:通过函数式思想吧一些列的需求操作下沉(把开发者不关心的东西封装) - 优化代码,节省逻辑
3:RxSwift
最典型的特色就是解决Swift
这门静态语言的响应能力,利用随时间维度序列变化为轴线,用户订阅关心能随轴线一直保活,达到订阅一次,响应一直持续~
网友评论