今天详细分解一下RxSwift的核心逻辑
序列的创建
let ob = Observable<Any>.create { (obserber) -> Disposable in
obserber.onNext(“Hello RxSwift")
obserber.onCompleted()
obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil))
return Disposables.create()
}
从Create.swift类文件进入。在Create.swift类中,其实就是返回了一个匿名序列AnonymousObservable
对象。
继续进入
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let _subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
上面就是AnonymousObservable类的定义,继承了Producer,保存了闭包的代码块。
订阅序列
订阅者AnonymousObserver的创建。后边对event作为一个闭包接收并转发相应的事件:onNext、onError、onCompleted、onDisposed,后续发送响应时会直接传入该event事件。
创建AnonymousObserver
对象,在其初始化AnonymousObserver时创建并保存了一个_eventHandler:self._eventHandler = eventHandler
,作为所有onNext
、onError
、onCompleted
的回调,用于在第3部分发送响应时将onNext
、onError
、onComplete
回调至ViewController.swift中的sucribe
块。
只要有event出现便进入switch判断,判断是.next
、.error
、.completed
等,并执行对应的闭包方法onNext、onError、onCompleted。但具体是如何传递数据到ViewController.swift的.subscribe闭包中的呢?我们继续分析。
在最后的return中调用create
,执行self.asObservable().subscribe(observer)
。将创建的observer
传递下去。
subscribe( )
:父类Producer.swift中定义了subscribe
函数
override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
发送响应
onNext
命令会调用起Create.swift中AnonymousObservableSink
的on
函数:
func on(_ event: Event<E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self._isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self._isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
执行self.fowardOn(event)
。forwardOn函数定义于AnonymousObservableSink的父类Sink
中。
执行Sink中on函数:
final func forwardOn(_ event: Event<O.E>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
if isFlagSet(self._disposed, 1) {
return
}
self._observer.on(event)
}
回调至subscribe
的 AnonymousObserver中的event闭包块执行响应的条件.onNext
。
let observer = AnonymousObserver<E> { event in
#if DEBUG
synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { synchronizationTracker.unregister() }
#endif
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
if let onError = onError {
onError(error)
}
else {
Hooks.defaultErrorHandler(callStack, error)
}
disposable.dispose()
case .completed:
onCompleted?()
disposable.dispose()
}
}
RxSwift实在太强大,下面上一张完整的思维导图帮助大家理解:
RxSwift核心逻辑.png
网友评论