今天梳理一下RxSwift核心逻辑
想要理解RxSwift核心逻辑首先要熟悉swift语言的基本用法
RxSwift使用函数式编程思想,一些基本使用我这里就不赘述了,大家可以看一下github上的Demo,仿写一下
我们以下面的代码来展开
//创建序列
let ob = Observable<String>.create { (obserber) -> Disposable in
obserber.onNext("帅的一匹")
obserber.onCompleted()
return Disposables.create()
}
//订阅
let _ = ob.subscribe(onNext: { (text) in
print("订阅到:\(text)")
}, onError: { (error) in
print("error: \(error)")
}, onCompleted: {
print("完成")
}) {
print("销毁")
}
首先进入到create方法里
/**
Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> RxSwift.Disposable) -> RxSwift.Observable<Self.E>
继续点进定义,发现点不进去,那我们就看一下注释被。从指定的订阅方法实现中创建一个可观察序列。http://reactivex.io/documentation/operators/create.html这里就是通过一层一层的路由
找到create文件
返回一个AnonymousObservable对象
image.png
逃逸闭包不理解可以翻看我之前的闭包的相关文章
这里AnonymousObservable包存了闭包也就是我们create传入的闭包,先做一个标记
AnonymousObservable 的继承关系是什么样的呢
AnonymousObservable<Element> 继承自 Producer<Element> ,Produce继承自 Observable<Element> , Observable 遵循 ObservableType : ObservableConvertibleType协议
我们从ObservableConvertibleType往下捋
public protocol ObservableConvertibleType {
/// Type of elements in sequence.
associatedtype E
/// Converts `self` to `Observable` sequence.
///
/// - returns: Observable sequence that represents `self`.
func asObservable() -> Observable<E>
}
ObservableConvertibleType 首先有个关联类型E,然后有个函数asObservable声明
ObservableType协议:
public protocol ObservableType : ObservableConvertibleType {
func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}
extension ObservableType {
/// Default implementation of converting `ObservableType` to `Observable`.
public func asObservable() -> Observable<E> {
// temporary workaround
//return Observable.create(subscribe: self.subscribe)
return Observable.create { o in
return self.subscribe(o)
}
}
}
ObservableType声明了一个函数subscribe
asObservable默认实现,这就有点类似于 as 的用法,也就意味着任何遵循这个协议的对象或者结构体都可以转换为Observable<E>
create做的事情也就是保存了闭包,那么我们再看看对可观察序列subscribe时都做了什么:
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
首先创建了一个AnonymousObserver<E>类型的对象observer,这个泛型E也就是示例代码 let ob = Observable<String>.create...
中的 String类型,我们看最后retrun, self.asObservable().subscribe(observer)中做了什么,此时self是AnonymousObservable类型的对象,所以找AnonymousObservable中的subscribe函数,但是里面并没有实现,而是在父类Producer中实现了
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
}
为什么这么做呢,其实也可以理解,父类这里处于一个OB视角,拥有统一的订阅能力,线程调度能力,销毁能力。具有公平原则。
调用self.run(observer, cancel: disposer)就会来到AnonymousObservable的run函数
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
AnonymousObservableSink是一个管子,看看他的结构(非常重要)
image.png
image.png
我们看下面这张图,这个管理既包含了观察者,又包含了销毁者,同时sink.run将self传了进去,self也就是当前的可观察序列,所以这个管子包含了就将观察者和序列关联了起来
sink : 订阅者 + 销毁者 + 序列 + 调度环境,流通序列与订阅者,处理业务逻辑。
image.png
我们再看sink.run
image.png image.png
可以看到,AnyObserver保存了可观察序列的on函数
image.png
从上面代码可以看出,当实例代码调用onNext的时候就会来到管子里的on函数:
image.png
又会调到sink的forwardOn
image.png
又会调用_observer.on, 前面我们说过 _observer不就是之前传进来的let observer = AnonymousObserver<E>这个东西吗?
image.png
通过模式匹配最终调用到我们传进来的闭包onNext,将value传递给外部
image.png
我们再来看看subscribe(event)的时候又是什么流程呢?流程也差不多,我把调用流程标注出来,你可以按照这个流程调试一下
public func subscribe(_ on: @escaping (Event<E>) -> Void)
-> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
image.png
image.png
image.png
image.png
image.png
所以这一系列流程并不是我们想像中的那么简单,其实如果看的框架比较多了,你自然会发现很多框架的思维都是类似的,所以好的框架是值得我们去好好研究与借鉴的。
网友评论