在上一篇文章中,主要描述了RxSwift的核心逻辑,也就是一个序列从创建到订阅然后从发送消息到接收消息的整个流程是怎样串联起来的。还不太理解的同学可以移步到上一篇文章了解一下。
这篇文章主要来分析一下RxSwift的几个核心类和协议的实现和设计。
Observable类解析
Observable
是可观察序列,是所有可观察序列的基类,我们不会直接使用Observable
这个类,一般都是使用子类。Observable
也可以理解成抽象类,实际上不是抽象类,因为可观察序列最重要的一个订阅序列的方法subscribe
必须在其子类中重写。
我们先来看看Observable
的源码:
public class Observable<Element> : ObservableType {
/// Type of elements in sequence.
public typealias E = Element
init() {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
rxAbstractMethod()
}
public func asObservable() -> Observable<E> {
return self
}
deinit {
#if TRACE_RESOURCES
_ = Resources.decrementTotal()
#endif
}
internal func composeMap<R>(_ transform: @escaping (Element) throws -> R) -> Observable<R> {
return _map(source: self, transform: transform)
}
}
-
Observable
实现了一个协议ObservableType
,而且ObservableType
协议继承自ObservableConvertibleType
协议,所以在Observable
中实现了两个协议方法:subscribe
和asObservable
。 -
subscribe
方法没有具体实现的逻辑,需要子类去实现。 -
asObservable
方法返回的是self,看似用处不大,其实不是这样的。asObservable
是非常有用的,如果一类是Observable
的子类,我们可以直接返回self,如果不是Observable
的子类,我们可以通过重写这个协议方法来返回一个Observable
对象,这样保证了协议的一致性。在使用的时候我们可以直接写类似self.asObservable().subscribe(observer)
这样的代码,有利于保持代码的简洁性,是良好的封装性的体现。所以我觉得这个设计非常的好,在我们日常开发中也可以借鉴。 -
_ = Resources.incrementTotal()
和_ = Resources.decrementTotal()
这两行代码其实是RxSwift内部实现的一个引用计数。这部分内容我会在后面的文章中再详解。 -
composeMap<R>
优化map的一个函数,不太理解用处。 -
Observable
子类非常多,这里不一一去看,主要区别在于对subscribe
方法的实现不一样。
AnonymousObservableSink类解析
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// state
private let _isStopped = AtomicInt(0)
#if DEBUG
fileprivate let _synchronizationTracker = SynchronizationTracker()
#endif
override init(observer: O, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
-
AnonymousObservableSink
是Sink
的子类,AnonymousObservableSink
本身遵守ObseverType
协议,与此同时实现了run
方法,虽然没有实现subscribe
方法,但是已经足够了,这样AnonymousObservableSink
从某种程度来说也是Observable
。 -
AnonymousObservableSink
是Observer和Observable的衔接的桥梁,也可以理解成管道。它存储了_observer
和销毁者_cancel
。通过sink就可以完成从Observable到Obsever的转变。 - 在
run
方法中的这行代码parent._subscribeHandler(AnyObserver(self))
,其中parent是一个AnonymousObservable
对象。_subscribeHandler
这个block调用,代码会执行到创建序列时的block。然后会调用发送信号的代码obserber.onNext("发送信号")
,然后代码会经过几个中间步骤会来到AnonymousObservableSink
类的on
方法。
有问题或者建议和意见,欢迎大家评论或者私信。
喜欢的朋友可以点下关注和喜欢,后续会持续更新文章。
网友评论