RxSwift
大家都已经很熟悉了,才会想了解核心逻辑的,基础的介绍就不在此赘述了,使用起来也非常方便,得益于RxSwift
设计者设计的精简API,使用的步骤如下:
- 1.创建序列
- 2.订阅序列
- 3.发送信号
我们对RxSwift
的使用已经非常熟练,对RxSwift
的执行流程也有一定的了解,但可能对于代码层面的实现逻辑不是那么清晰,因为这需要去啃枯燥的源码才能获得,出于种种原因,可能并没有去啃,这里把我的心得分享一下。
最基础的使用代码
最开始学习RxSwift
的时候,我们都运行过如下一段代码:
// 1.创建Int序列
_ = Observable<Int>.create({ (observer) -> Disposable in
// 3.发送信号
observer.onNext(666)
return Disposables.create()
}).subscribe(onNext: { (num) in // 2.订阅1创建的序列
print("订阅到的数字:\(num)")
})
// 打印结果:订阅到的数字:666
我们都知道,控制台肯定会打印订阅到的数字:666
,但问题也随之而来,为什么在创建序列的闭包中发送的信号会在订阅的闭包中接收到,于是就该进入源码了。
源码分析
创建序列部分
Observable
—— 可观察序列,这个我们都很熟悉了。
首先看下Observable
的定义:
public class Observable<Element> : RxSwift.ObservableType {
/// Type of elements in sequence.
public typealias E = Element
public func subscribe<O>(_ observer: O) -> RxSwift.Disposable where Element == O.E, O : RxSwift.ObserverType
public func asObservable() -> RxSwift.Observable<RxSwift.Observable<Element>.E>
}
看Observable
的定义,我们看到了熟悉的两个方法:
1.
subscribe
,订阅方法【但这个订阅并非我们平常用的订阅,这个订阅是Observable
内部使用的,看完后面就明白了】
2.asObservable
,转为可观察序列的方法
再看往上溯源,其实这两个方法是来自于实现的协议,并非类本身的能力。
订阅是ObservableType
的能力;
public protocol ObservableType : ObservableConvertibleType {
func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}
转为可观察序列是ObservableConvertibleType
的能力;
public protocol ObservableConvertibleType {
associatedtype E
func asObservable() -> Observable<E>
}
可,并没有看见创建序列的方法,Observable
是怎么创建出来的呢?
既然本类里面没有,实现的协议中也没有,那就只可能是通过协议扩展增加的方法,于是我们检索extension ObservableType
,发现了很多的扩展,暂时先忽略,把创建的扩展拎出来。
extension ObservableType {
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> RxSwift.Disposable) -> RxSwift.Observable<Self.E>
}
当找到方法的定义之后,问题又来了,这只是定义,实现在哪里呢,jump
根本不管用,于是继续检索,然后找到了具体的实现,原来是提供了默认实现的,在Create.swift
类中,有如下实现:
extension ObservableType {
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
}
看这个创建方法的默认实现,我们get
了如下两个点:
- 通过
crate
方法返回了一个AnonymousObservable
-
AnonymousObservable
保存了发送信号的闭包
我们先来看看AnonymousObservable
是个啥,顾名思义,这肯定也是一个可观察序列,Anonymous
是匿名的意思,那也就是说这是个匿名可观察序列,不暴露给外部使用的,在内部流转使用的。
我们看一下AnonymousObservable
定义:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
看到这,我们get
到了,创建可观察序列的发送信号的闭包是保存在了AnonymousObservable
中。
到这,关于创建可观察序列的流程,就结束了。
但关于可观察序列的源码解读还没有结束,我们这里先看一下AnonymousObservable
的继承链和协议实现链,后面看订阅的流转会清晰的多。
![](https://img.haomeiwen.com/i3373160/03562c756df4fb45.png)
AnonymousObservable
类继承自Producer
类,Producer
继承自Observable
类,Observable
类实现了ObservableType
协议,ObservableType
协议继承了ObservableConvertibleType
协议。
ObservableType
协议的扩展对继承自ObservableConvertibleType
协议的asObservable()
方法有默认实现。
extension ObservableType {
/// Default implementation of converting `ObservableType` to `Observable`.
public func asObservable() -> Observable<E> {
// temporary workaround
//return Observable.create(subscribe: self.subscribe)
return Observable.create { o in
return self.subscribe(o)
}
}
}
Observable
类实现了协议链中的两个方法,但subscribe
方法是不允许直接直接使用的,交给子类Producer
重写:
public class Observable<Element> : ObservableType {
// 省略。。。
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
// 直接使用抛出异常
rxAbstractMethod()
}
public func asObservable() -> Observable<E> {
return self
}
// 省略。。。
}
Producer
除了重写subscribe
方法,再定义了一个run
方法给子类AnonymousObservable
重写,run
方法也是不允许直接使用的:
class Producer<Element> : Observable<Element> {
// 重写父类的 subscribe
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
// 定义给子类使用的方法,直接使用抛异常
func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
rxAbstractMethod()
}
}
AnonymousObservable
中,除了定义了保存创建闭包的构造函数,就是重写父类的run
方法:
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
然后,就该到订阅的部分了,从订阅中找寻创建序列的发送信号的闭包是怎么流转的,是怎样才能订阅到发送的信号的。
订阅序列部分
我们通过示例代码中的subscribe
跳转过去看到的是:
extension ObservableType {
// 省略...
public func subscribe(onNext: ((Self.E) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> RxSwift.Disposable
}
跟创建Observable
的create
方法一样的套路,对协议进行扩展,然后提供默认实现:
extension ObservableType {
// 省略...
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
我们看这个方法的实现,get
到如下几个点:
- 创建了
AnonymousObserver
实例,看着很眼熟,跟AnonymousObservable
类似,创建了一个内部观察者,并传递了一个事件处理闭包,后面就能知道,信号发送就是通过这个闭包来回调的;
- 创建了
- 此处的
self
其实就是AnonymousObservable
;
- 此处的
-
self.asObservable()
是基类协议ObservableConvertibleType
的能力,而ObservableType
是继承自ObservableConvertibleType
的,上文中有说到该方法的默认实现,这个方法就是把ObservableType
转成Observable
——可观察序列;
-
-
self.asObservable().subscribe(observer)
,也是调用AnonymousObservable
中的subscribe
方法,如果记得上文中AnonymousObservable
的继承和协议链的话(不记得话,回上文),就很清晰,其实这里调用的是Produce
中的subscribe
方法
-
那我们就到Produce
中的subscribe
方法来看具体操作了什么:
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// 忽略...
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
在else
分支中,调用了self.run
,Producer
只是定义,真正的实现是在AnonymousObservable
中的重写:
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
从run
方法的实现中,我们可以get
到如下几个点:
- 创建了一个
AnonymousObservableSink
类型的sink
管道,构造函数传入的参数是AnonymousObserver
的实例observe
- 创建了一个
-
sink
管道调用了自身的run
方法,把self
(也就是AnonymousObservable
)当做参数传入
-
然后我们看一下AnonymousObservableSink
类中run
方法的实现:
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
这里是调用了AnonymousObservable
中的_subscribeHandler
闭包,这也就解释了为什么我们在订阅了序列之后会执行发送信号的闭包。
同时new
了一个新类AnyObserver
,也就是说将AnonymousObservableSink
实例当做参数构造了一个AnyObserver
实例,先来看下AnyObserver
的定义:
public struct AnyObserver<Element> : ObserverType {
public typealias E = Element
public typealias EventHandler = (Event<Element>) -> Void
private let observer: EventHandler
public init(eventHandler: @escaping EventHandler) {
self.observer = eventHandler
}
public init<O : ObserverType>(_ observer: O) where O.E == Element {
self.observer = observer.on
}
public func on(_ event: Event<Element>) {
return self.observer(event)
}
public func asObserver() -> AnyObserver<E> {
return self
}
}
注意看第二个构造函数,接收的是ObserverType
类型的参数,将observer.on
保存到自己的属性observer: EventHandler
,也就是说保存到observer
属性的不是AnonymousObservableSink
实例,而是AnonymousObservableSink
实例中的on
方法。
看到这,其实也就明了了一件事,observer.onNext(666)
,其实就是AnyObserver.onNext(666)
。
上面我们贴出了AnyObserver
的类定义,里面是没有onNext
方法的,那自然想到去找父类、协议或协议的扩展,然后看ObserverType
协议的定义和扩展:
public protocol ObserverType {
associatedtype E
func on(_ event: Event<E>)
}
extension ObserverType {
public func onNext(_ element: E) {
self.on(.next(element))
}
public func onCompleted() {
self.on(.completed)
}
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
看着这些方法,是不是有种特别的熟悉的感觉,onNext
、onCompleted
和onError
,我们在订阅时常用的方法,看方法的实现,也就意味着,observer.onNext(666)
就变成了AnyObserver.on(.next(666))
,然后我们回到AnyObserver
中的on
方法:
public func on(_ event: Event<Element>) {
return self.observer(event)
}
上文中说过,AnyObserver
中self.observer
其实就是AnonymousObservableSink
实例中的on
方法,那observer.onNext(666)
就变成了AnonymousObservableSink.on(.next(666))
。
于是,于是,我们又回到了AnonymousObservableSink
中,到这,不得不感慨RxSwift
作者神来一笔的设计,大神就是大神。。。
然后来看一眼on
方法的实现:
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
内部调用了forwardOn
方法,AnonymousObservableSink
本类是没有这个方法的,于是我们找父类或协议,在父类Sink
中找到了该方法:
final func forwardOn(_ event: Event<O.E>) {
self._observer.on(event)
}
self._observer
是什么,上文中是不是说过创建AnonymousObservableSink
时传入的参数就是我们订阅时创建的AnonymousObserver
实例,所以observer.onNext(666)
就变成了AnonymousObserver.on(.next(666))
,但AnonymousObserver
中是没有on
方法的,但我们在父类ObserverBase
中找到了方法的实现:
func on(_ event: Event<E>) {
switch event {
case .next:
if load(self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
最终执行self.onCore(event)
,其实还是回到了AnonymousObserver
中:
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
而self._eventHandler
又是什么呢,是构造函数传入的闭包,于是就回到了订阅时的创建AnonymousObserver
时的闭包了:
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
当self._eventHandler
调用时,就会回调到上述闭包中,然后判断不同的事件类型,通过我们在订阅时创建的闭包根据不同的闭包回调出去,onNext
、onError
或onCompleted
,这也就解释了我们在订阅后能接收到回调的原因。
总结
整个核心逻辑,看起来很复杂、很混乱,总结下来,如下几点:
- 1.创建序列时,创建一个闭包
A
,同时创建了AnonymousObservable
实例,将闭包A
存在实例中 - 2.订阅序列时,创建一个闭包
B
,同时创建了AnonymousObserve
实例,将闭包B
存在实例中 - 3.创建一个
AnonymousObservableSink
实例,AnonymousObserve
实例为AnonymousObservableSink
构造函数的参数,AnonymousObserve
实例存储在AnonymousObservableSink
的父类Sink
的_observer
属性中 - 4.调用
AnonymousObservableSink
实例的run
方法,执行闭包A
,同时创建一个AnyObserver
实例 - 5.
AnyObserver
实例保存了Sink
中的on
方法 - 6.闭包
A
中发送信号,其实调用的就是存在AnonymousObservableSink
中的AnonymousObserve
实例父类ObserverBase
中的on
方法,最后又回到AnonymousObserve
实例的onCore
方法 - 7.
onCore
方法的执行,实际上就是在执行闭包B
,而闭包B
执行就会根据事件类型回调我们的订阅闭包
好了,整个核心逻辑就是这样了,再次膜拜大神的设计~
网友评论