美文网首页
RxSwift 核心逻辑

RxSwift 核心逻辑

作者: 半心_忬 | 来源:发表于2020-03-25 19:05 被阅读0次

RxSwift大家都已经很熟悉了,才会想了解核心逻辑的,基础的介绍就不在此赘述了,使用起来也非常方便,得益于RxSwift设计者设计的精简API,使用的步骤如下:

  • 1.创建序列
  • 2.订阅序列
  • 3.发送信号

我们对RxSwift的使用已经非常熟练,对RxSwift的执行流程也有一定的了解,但可能对于代码层面的实现逻辑不是那么清晰,因为这需要去啃枯燥的源码才能获得,出于种种原因,可能并没有去啃,这里把我的心得分享一下。

最基础的使用代码

最开始学习RxSwift的时候,我们都运行过如下一段代码:

// 1.创建Int序列
_ = Observable<Int>.create({ (observer) -> Disposable in
    // 3.发送信号
    observer.onNext(666)
    
    return Disposables.create()
}).subscribe(onNext: { (num) in // 2.订阅1创建的序列
    print("订阅到的数字:\(num)")
})
    
// 打印结果:订阅到的数字:666

我们都知道,控制台肯定会打印订阅到的数字:666,但问题也随之而来,为什么在创建序列的闭包中发送的信号会在订阅的闭包中接收到,于是就该进入源码了。

源码分析

创建序列部分

Observable —— 可观察序列,这个我们都很熟悉了。

首先看下Observable的定义:

public class Observable<Element> : RxSwift.ObservableType {
    /// Type of elements in sequence.
    public typealias E = Element

    public func subscribe<O>(_ observer: O) -> RxSwift.Disposable where Element == O.E, O : RxSwift.ObserverType
    
    public func asObservable() -> RxSwift.Observable<RxSwift.Observable<Element>.E>
}

Observable的定义,我们看到了熟悉的两个方法:

1.subscribe,订阅方法【但这个订阅并非我们平常用的订阅,这个订阅是Observable内部使用的,看完后面就明白了】
2.asObservable,转为可观察序列的方法

再看往上溯源,其实这两个方法是来自于实现的协议,并非类本身的能力。

订阅是ObservableType的能力;

public protocol ObservableType : ObservableConvertibleType {
    func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
}

转为可观察序列是ObservableConvertibleType的能力;

public protocol ObservableConvertibleType {
    associatedtype E
    
    func asObservable() -> Observable<E>
}

可,并没有看见创建序列的方法,Observable是怎么创建出来的呢?

既然本类里面没有,实现的协议中也没有,那就只可能是通过协议扩展增加的方法,于是我们检索extension ObservableType,发现了很多的扩展,暂时先忽略,把创建的扩展拎出来。

extension ObservableType {
    public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> RxSwift.Disposable) -> RxSwift.Observable<Self.E>
}

当找到方法的定义之后,问题又来了,这只是定义,实现在哪里呢,jump根本不管用,于是继续检索,然后找到了具体的实现,原来是提供了默认实现的,在Create.swift类中,有如下实现:

extension ObservableType {
    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
        return AnonymousObservable(subscribe)
    }
}

看这个创建方法的默认实现,我们get了如下两个点:

  • 通过crate方法返回了一个AnonymousObservable
  • AnonymousObservable保存了发送信号的闭包

我们先来看看AnonymousObservable是个啥,顾名思义,这肯定也是一个可观察序列,Anonymous是匿名的意思,那也就是说这是个匿名可观察序列,不暴露给外部使用的,在内部流转使用的。

我们看一下AnonymousObservable定义:

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

看到这,我们get到了,创建可观察序列的发送信号的闭包是保存在了AnonymousObservable中。

到这,关于创建可观察序列的流程,就结束了。

但关于可观察序列的源码解读还没有结束,我们这里先看一下AnonymousObservable的继承链和协议实现链,后面看订阅的流转会清晰的多。

AnonymousObservable继承链和协议链.png

AnonymousObservable类继承自Producer类,Producer继承自Observable类,Observable类实现了ObservableType协议,ObservableType协议继承了ObservableConvertibleType协议。

ObservableType协议的扩展对继承自ObservableConvertibleType协议的asObservable()方法有默认实现。

extension ObservableType {
    /// Default implementation of converting `ObservableType` to `Observable`.
    public func asObservable() -> Observable<E> {
        // temporary workaround
        //return Observable.create(subscribe: self.subscribe)
        return Observable.create { o in
            return self.subscribe(o)
        }
    }
}

Observable类实现了协议链中的两个方法,但subscribe方法是不允许直接直接使用的,交给子类Producer重写:

public class Observable<Element> : ObservableType {
    // 省略。。。
    
    public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
        // 直接使用抛出异常
        rxAbstractMethod()
    }
    
    public func asObservable() -> Observable<E> {
        return self
    }
    
    // 省略。。。
}

Producer除了重写subscribe方法,再定义了一个run方法给子类AnonymousObservable重写,run方法也是不允许直接使用的:

class Producer<Element> : Observable<Element> {
    // 重写父类的 subscribe
    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        if !CurrentThreadScheduler.isScheduleRequired {
            // The returned disposable needs to release all references once it was disposed.
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
            return disposer
        }
        else {
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
        }
    }

    // 定义给子类使用的方法,直接使用抛异常
    func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        rxAbstractMethod()
    }
}

AnonymousObservable中,除了定义了保存创建闭包的构造函数,就是重写父类的run方法:

final private class AnonymousObservable<Element>: Producer<Element> {
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable

    let _subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler) {
        self._subscribeHandler = subscribeHandler
    }

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

然后,就该到订阅的部分了,从订阅中找寻创建序列的发送信号的闭包是怎么流转的,是怎样才能订阅到发送的信号的。

订阅序列部分

我们通过示例代码中的subscribe跳转过去看到的是:

extension ObservableType {
    // 省略...

    public func subscribe(onNext: ((Self.E) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> RxSwift.Disposable
}

跟创建Observablecreate方法一样的套路,对协议进行扩展,然后提供默认实现:

extension ObservableType {
    // 省略...
    
    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            }
            
            #if DEBUG
                let synchronizationTracker = SynchronizationTracker()
            #endif
            
            let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
            
            let observer = AnonymousObserver<E> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
}

我们看这个方法的实现,get到如下几个点:

    1. 创建了AnonymousObserver实例,看着很眼熟,跟AnonymousObservable类似,创建了一个内部观察者,并传递了一个事件处理闭包,后面就能知道,信号发送就是通过这个闭包来回调的;
    1. 此处的self其实就是AnonymousObservable
    1. self.asObservable()是基类协议ObservableConvertibleType的能力,而ObservableType是继承自ObservableConvertibleType的,上文中有说到该方法的默认实现,这个方法就是把ObservableType转成Observable——可观察序列;
    1. self.asObservable().subscribe(observer),也是调用AnonymousObservable中的subscribe方法,如果记得上文中AnonymousObservable的继承和协议链的话(不记得话,回上文),就很清晰,其实这里调用的是Produce中的subscribe方法

那我们就到Produce中的subscribe方法来看具体操作了什么:

override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
    if !CurrentThreadScheduler.isScheduleRequired {
        // 忽略...
    }
    else {
        return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
            return disposer
        }
    }
}

else分支中,调用了self.runProducer只是定义,真正的实现是在AnonymousObservable中的重写:

override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
    let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
    let subscription = sink.run(self)
    return (sink: sink, subscription: subscription)
}

run方法的实现中,我们可以get到如下几个点:

    1. 创建了一个AnonymousObservableSink类型的sink管道,构造函数传入的参数是AnonymousObserver的实例observe
    1. sink管道调用了自身的run方法,把self(也就是AnonymousObservable)当做参数传入

然后我们看一下AnonymousObservableSink类中run方法的实现:

func run(_ parent: Parent) -> Disposable {
    return parent._subscribeHandler(AnyObserver(self))
}

这里是调用了AnonymousObservable中的_subscribeHandler闭包,这也就解释了为什么我们在订阅了序列之后会执行发送信号的闭包

同时new了一个新类AnyObserver,也就是说将AnonymousObservableSink实例当做参数构造了一个AnyObserver实例,先来看下AnyObserver的定义:

public struct AnyObserver<Element> : ObserverType {
    public typealias E = Element
    
    public typealias EventHandler = (Event<Element>) -> Void

    private let observer: EventHandler

    public init(eventHandler: @escaping EventHandler) {
        self.observer = eventHandler
    }
    
    public init<O : ObserverType>(_ observer: O) where O.E == Element {
        self.observer = observer.on
    }
    
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
    
    public func asObserver() -> AnyObserver<E> {
        return self
    }
}

注意看第二个构造函数,接收的是ObserverType类型的参数,将observer.on保存到自己的属性observer: EventHandler也就是说保存到observer属性的不是AnonymousObservableSink实例,而是AnonymousObservableSink实例中的on方法

看到这,其实也就明了了一件事,observer.onNext(666),其实就是AnyObserver.onNext(666)

上面我们贴出了AnyObserver的类定义,里面是没有onNext方法的,那自然想到去找父类、协议或协议的扩展,然后看ObserverType协议的定义和扩展:

public protocol ObserverType {
    associatedtype E
    
    func on(_ event: Event<E>)
}

extension ObserverType {
    public func onNext(_ element: E) {
        self.on(.next(element))
    }
    public func onCompleted() {
        self.on(.completed)
    }
    public func onError(_ error: Swift.Error) {
        self.on(.error(error))
    }
}

看着这些方法,是不是有种特别的熟悉的感觉,onNextonCompletedonError,我们在订阅时常用的方法,看方法的实现,也就意味着,observer.onNext(666)就变成了AnyObserver.on(.next(666)),然后我们回到AnyObserver中的on方法:

public func on(_ event: Event<Element>) {
    return self.observer(event)
}

上文中说过,AnyObserverself.observer其实就是AnonymousObservableSink实例中的on方法,那observer.onNext(666)就变成了AnonymousObservableSink.on(.next(666))

于是,于是,我们又回到了AnonymousObservableSink中,到这,不得不感慨RxSwift作者神来一笔的设计,大神就是大神。。。

然后来看一眼on方法的实现:

func on(_ event: Event<E>) {
    switch event {
    case .next:
        if load(self._isStopped) == 1 {
            return
        }
        self.forwardOn(event)
    case .error, .completed:
        if fetchOr(self._isStopped, 1) == 0 {
            self.forwardOn(event)
            self.dispose()
        }
    }
}

内部调用了forwardOn方法,AnonymousObservableSink本类是没有这个方法的,于是我们找父类或协议,在父类Sink中找到了该方法:

final func forwardOn(_ event: Event<O.E>) {
    self._observer.on(event)
}

self._observer是什么,上文中是不是说过创建AnonymousObservableSink时传入的参数就是我们订阅时创建的AnonymousObserver实例,所以observer.onNext(666)就变成了AnonymousObserver.on(.next(666)),但AnonymousObserver中是没有on方法的,但我们在父类ObserverBase中找到了方法的实现:

func on(_ event: Event<E>) {
    switch event {
    case .next:
        if load(self._isStopped) == 0 {
            self.onCore(event)
        }
    case .error, .completed:
        if fetchOr(self._isStopped, 1) == 0 {
            self.onCore(event)
        }
    }
}

最终执行self.onCore(event),其实还是回到了AnonymousObserver中:

override func onCore(_ event: Event<Element>) {
    return self._eventHandler(event)
}

self._eventHandler又是什么呢,是构造函数传入的闭包,于是就回到了订阅时的创建AnonymousObserver时的闭包了:

let observer = AnonymousObserver<E> { event in
                
    #if DEBUG
        synchronizationTracker.register(synchronizationErrorMessage: .default)
        defer { synchronizationTracker.unregister() }
    #endif
    
    switch event {
    case .next(let value):
        onNext?(value)
    case .error(let error):
        if let onError = onError {
            onError(error)
        }
        else {
            Hooks.defaultErrorHandler(callStack, error)
        }
        disposable.dispose()
    case .completed:
        onCompleted?()
        disposable.dispose()
    }
}

self._eventHandler调用时,就会回调到上述闭包中,然后判断不同的事件类型,通过我们在订阅时创建的闭包根据不同的闭包回调出去,onNextonErroronCompleted,这也就解释了我们在订阅后能接收到回调的原因。

总结

整个核心逻辑,看起来很复杂、很混乱,总结下来,如下几点:

  • 1.创建序列时,创建一个闭包A,同时创建了AnonymousObservable实例,将闭包A存在实例中
  • 2.订阅序列时,创建一个闭包B,同时创建了AnonymousObserve实例,将闭包B存在实例中
  • 3.创建一个AnonymousObservableSink实例,AnonymousObserve实例为AnonymousObservableSink构造函数的参数,AnonymousObserve实例存储在AnonymousObservableSink的父类Sink_observer属性中
  • 4.调用AnonymousObservableSink实例的run方法,执行闭包A,同时创建一个AnyObserver实例
  • 5.AnyObserver实例保存了Sink中的on方法
  • 6.闭包A中发送信号,其实调用的就是存在AnonymousObservableSink中的AnonymousObserve实例父类ObserverBase中的on方法,最后又回到AnonymousObserve实例的onCore方法
  • 7.onCore方法的执行,实际上就是在执行闭包B,而闭包B执行就会根据事件类型回调我们的订阅闭包

好了,整个核心逻辑就是这样了,再次膜拜大神的设计~

相关文章

网友评论

      本文标题:RxSwift 核心逻辑

      本文链接:https://www.haomeiwen.com/subject/ffwxuhtx.html