RxSwift核心概念就是观察者(observer)订阅一个可观察者序列(Observable),观察者对数据序列做出响应。
比如张三在统计店里的营业情况,每来一位客户就记录下客户点了什么菜,评价怎么样,这个时候一位一位的客户就是序列,张三对客户的反馈进行记录就是对序列做出了响应。
可观察者序列
前面提到可观察者序列,这个序列分为三种情况(也就是Event事件):
- next - 序列产生一个新的元素
- error - 创建序列时产生了错误,导致序列结束
- completed - 序列的所有元素已经成功产生了,整个序列完成
下面来看看一个简单的序列是怎么产生的?
let ob = Observable<Any>.create { (obserber) -> Disposable in
obserber.onNext("执行onNext")
obserber.onCompleted()
return Disposables.create()
}
let _ = ob.subscribe(onNext: { (text) in
print("订阅:\(text)")
}, onError: { (error) in
print("error:\(error)")
}, onCompleted: {
print("完成了")
}) {
print("销毁了")
}
从上面的代码中可看出create
创建了一个可观察者序列,怎么证明呢?点进去查看create
方法源码
extension ObservableType {
/**
Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.Element>) -> Disposable) -> RxSwift.Observable<Self.Element>
}
从源码的注释中我们可以看出这个方法返回的是一个带有subscribe
方法指定实现的可观察者序列,再从注释中可看出真正创建序列的是在一个Create.swift
文件中,如下图我们找到这个文件进入源码。
extension ObservableType {
// MARK: create
/**
Creates an observable sequence from a specified subscribe method implementation.
- seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
- parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
- returns: The observable sequence with the specified implementation for the `subscribe` method.
*/
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
return AnonymousObservable(subscribe)
}
}
从源码中可看出创建序列是由一个AnonymousObservable
内部类创建的
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
再跟进去可看到AnonymousObservable
继承自Producer
,在初始化的时候还用_subscribeHandler
保存了一个事件闭包。接下来回到序列的监听
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
#if DEBUG
let synchronizationTracker = SynchronizationTracker()
#endif
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<Element> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
乍一看代码好长啊,前面的代码Disposable
这些我们先不用管,直接看创建了一个observer
对象,从上面我们得知了AnonymousObserver
保存了外面的事件闭包,看到这里发现怎么没有调用subscribe
呢?不着急继续往下看
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
在返回前调用了subscribe
,咦这个asObservable()
又是什么鬼?
public func asObservable() -> Observable<Element> {
return self
}
在Observable.swift
文件中的源码找到了原来就是可观察者序列本身,它走的是AnonymousObservable
父类Producer
的subscribe
实现
class Producer<Element> : Observable<Element> {
override init() {
super.init()
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}
在这个方法中调用了AnonymousObservable
的run
方法
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
接着在这个方法里面又调用了下面这个方法,在这个方法里面_subscribeHandler
就是执行了最初的create
函数的闭包
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
在AnonymousObservable
的run
方法中AnonymousObservableSink
初始化时把observer
又赋值给了AnyObserver
的属性observer
,这里的observer
就是之前的EventHandler
,这就是为什么obserber.onNext("执行onNext")
会触发到AnonymousObservableSink.on
事件
func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
final class AnonymousObserver<Element>: ObserverBase<Element> {
typealias EventHandler = (Event<Element>) -> Void
private let _eventHandler : EventHandler
init(_ eventHandler: @escaping EventHandler) {
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
self._eventHandler = eventHandler
}
override func onCore(_ event: Event<Element>) {
return self._eventHandler(event)
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}
最终执行EventHandler
是由AnonymousObserver
的onCore
方法。
网友评论