前言
RxSwift 由于在日常工作中会经常使用,所以下面进行核心源码分析与探究,学习优秀开源框架之路,先进行序列的源码分析。
RxSwift 核心流程
- 创建序列
- 订阅序列
- 发送信号
第一步 创建序列
let ob = Observable<String>.create { (observer) -> Disposable in
return Disposables.create()
}
第二步 订阅信号
let _ = ob.subscribe(onNext: { (text) in
print("订阅信息: \(text)")
}, onError: { (error) in
print("error: \(error)")
}, onCompleted: {
print("订阅结束")
}) {
print("已销毁")
}
第三步 发送信号
let ob = Observable<String>.create { (observer) -> Disposable in
// 第三步:发送信号
obserber.onNext("你好明天")
return Disposables.create()
}
代码合并:
//1、创建序列
let obs = Observable<Any>.create { (observer) -> Disposable in
//3、发送信号
observer.onNext("我是一条消息")
return Disposables.create()
}
//2、订阅序列
obs.subscribe(onNext: { (val) in
//4、序列监听
print("onNext:\(val)")
}).disposed(by: disposeBag)//5、打包待销毁
从代码合并角度来看:
- 通过Observable的create创建序列,在create闭包内调用onNext方法实现信号发送
- 调用subscribe方法订阅序列,并实现subscribe的参数闭包onNext,在闭包内监听信号
- 最后通过disposed对序列打包等待销毁
大致流程如下图所示:

RxSwift 核心序列代码分析
1 创建序列
1.使用Observable的create方法创建可观察序列
public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
return AnonymousObservable(subscribe)
}
可观察序列 的创建是利用协议拓展功能的create方法实现的,里面创建了一个 AnonymousObservable(匿名可观察序列)
//匿名序列
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
//保存外部订阅的代码块
self._subscribeHandler = subscribeHandler
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
- AnonymousObservable继承了 Producer 具有非常重要的方法 subscribe
- 可以看到AnonymousObservable这里一共两个方法:init方法是用来保存这个subscribe闭包,run方法执行闭包操作
继承关系如下图所示:

2 订阅序列
使用ObservableType的subscribe订阅信号
public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
//此处省略若干行代码
let observer = AnonymousObserver<E> { event in
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
该方法是对ObservableType的拓展。在方法内部已经出现对观察者的定义,AnonymousObserver类型的闭包observer。
-
observer内部调用的外部(应用层)实现的闭包,由此看出所有信号是由此发出,event是observer的参数,不难看出,observer闭包也是在其他地方调用,传入带有信号值的event参数
-
observer被当做参数传入到subscribe中,而observer的调用必然是在subscribe中实现的
self.asObservable().subscribe(observer)
-
self.asObservable()该方法返回本身,保证协议的一致性,方法如下:
public class Observable<Element> : ObservableType { // 省去代码若干 public func asObservable() -> Observable<E> { return self } }
-
继续断点执行找到subscribe方法,正是上面所提到的Producer中的方法,方法如下:
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element { // 省去代码若干 return CurrentThreadScheduler.instance.schedule(()) { _ in let disposer = SinkDisposer() let sinkAndSubscription = self.run(observer, cancel: disposer) disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription) return disposer } }
-
observer观察者被传入到run中,上面说到该观察者一定会被调用,继续深入
let sinkAndSubscription = self.run(observer, cancel: disposer)
-
发现self.run的调用,调用的是AnonymousObservable中的run方法,代码如下:
final private class AnonymousObservable<Element>: Producer<Element> { typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable let _subscribeHandler: SubscribeHandler init(_ subscribeHandler: @escaping SubscribeHandler) { self._subscribeHandler = subscribeHandler } override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element { let sink = AnonymousObservableSink(observer: observer, cancel: cancel) let subscription = sink.run(self) return (sink: sink, subscription: subscription) } }
-
此处就是创建序列时的AnonymousObservable类。在run方法类创建了sink对象,在初始化时传入了我们上面所说的观察者,记住sink保存了观察者observer闭包,并且调用了sink.run(self)方法,传入的是创建时产生的可观察序列observable闭包对象,深入run:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType { typealias E = O.E typealias Parent = AnonymousObservable<E> // 省去代码若干 // 此处向父类Sink初始化了observer对象 override init(observer: O, cancel: Cancelable) { super.init(observer: observer, cancel: cancel) } func run(_ parent: Parent) -> Disposable { return parent._subscribeHandler(AnyObserver(self)) } }
-
此处parent由let subscription = sink.run(self)传入,self即为创建序列create方法返回的observable对象,而_subscribeHandler是创建序列所保存的闭包,此时我们的闭包就被调用了,被调用闭包如下:
let obs = Observable<Any>.create { (observer) -> Disposable in //3、发送消息 observer.onNext("我是一条消息") return Disposables.create() }
3 发送信号
在信号发送闭包中通常调用一下三种方法,用来发送信号。如下:
- observer.onNext("我是一条消息") 信号发送
- observer.onCompleted() 序列完成,完成后序列将被释放
- observer.onError(error) 序列出错中断,序列不可继续使用,被释放
以上三个方法为ObserverType的拓展方法
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: E))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: E) {
self.on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
self.on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
-
E表示了一个泛型信号量,可表示任意类型的信号
-
.next(element)是一个带泛型参数的枚举,管理了三种类型事件的消息传递。如下:
public enum Event<Element> { /// Next element is produced. case next(Element) /// Sequence terminated with an error. case error(Swift.Error) /// Sequence completed successfully. case completed }
on这是AnonymousObservableSink中的方法,代码如下:
final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
typealias E = O.E
typealias Parent = AnonymousObservable<E>
// 代码省略若干行
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(&self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(&self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
}
内部根据Event枚举不同成员变量做不同的信号发送,信号发送调用了forwardOn方法。方法实现如下:
class Sink<O : ObserverType> : Disposable {
init(observer: O, cancel: Cancelable) {
self._observer = observer
self._cancel = cancel
}
final func forwardOn(_ event: Event<O.E>) {
if isFlagSet(&self._disposed, 1) {
return
}
self._observer.on(event)
}
}
_observer即是订阅中在内部产生的AnonymousObserver对象,而该对象调用了on方法并传递了信号。on方法所在位置如下:
AnonymousObserver -> ObserverBase -> on()
class ObserverBase<ElementType> : Disposable, ObserverType {
typealias E = ElementType
func on(_ event: Event<E>) {
switch event {
case .next:
if load(&self._isStopped) == 0 {
self.onCore(event)
}
case .error, .completed:
if fetchOr(&self._isStopped, 1) == 0 {
self.onCore(event)
}
}
}
}
在方法内部又掉用了self.onCore(event),此时该方法在AnonymousObserver中实现,代码如下:
final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
typealias Element = ElementType
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
}
RxSwift 序列总结

网友评论