美文网首页
RxSwift (二)序列核心逻辑分析

RxSwift (二)序列核心逻辑分析

作者: 孔雨露 | 来源:发表于2019-08-09 23:56 被阅读0次

    @TOC

    RxSwift序列核心逻辑

    上一篇博客:Rxswift学习之(一)函数响应式编程思想只是简单的分析了序列的核心逻辑。本篇博客主要针对上一篇做一下更加深入的探讨,如果有那些地方分析有误,还请留言:QQ:282889543,让我们彼此提高,彼此成就。

    总的来说分析Rxswift的核心逻辑还是按照三部曲:创建序列,订阅序列,销毁序列。核心思想是万物皆序列。

    1. 序列的创建

    Observable可观察者序列

    我们先来看下创建Observable所涉及的类的继承关系:
    如下图:


    针对上面的类图,简单分析下类的关系和设计思想:
    首先分层实施的很彻底,每一层都只解决一件事情,一层层叠起来结构非常清晰:
    AnonymousObservable -> Producer -> Observable -> ObservableType -> ObservableConvertibleType

    其次我们简单分解一下每个类都做了些什么:

    • ObservableConvertibleType:顾名思义即可转换为Observable 类型协议,方法只有一个asObservable,这有什么好处呢?
    1. 用户不需要关注其具体是哪个类型对象
    2. 让用户更多的关注其核心功能
    • ObservableType:也是个协议,继承了ObservableConvertibleType协议的asObservable,它提供抽象方法subscribe,即我们常说的订阅,只有外部订阅了该对象,才能真正实现对该对象进行观察。
    • Observable:真正的类,可以称之为元类,对于用户来说Observable 的功能是完整的,因为它已经具备了所有的用户所需要的功能,尽管有些方法并没有得到实现仍是抽象方法。
      Producer: 它继承了Observable的所有方法,并实现subscribe 方法
    • AnonymousObservable:它继承了Producer的所有方法,并且增加了属性let _subscribeHandler: SubscribeHandler用来保存创建序列时传入的闭包,也就相对于拥有了调用这个序列的能力,此外它还实现run方法,这也是创建序列最核心关键的方法。在run()方法中它创建一个AnonymousObservableSink final private类的对象,而这个对象sink可以称之为管子,它类似于manager的角色,拥有序列和订阅,销毁能力。这里有两个疑惑:

    问题1. AnonymousObservableSink为什么要定义成final private类,不能被继承,也不能被外部访问?
    问题2. 创建的Observable是如何关联到订阅的?

    这两个问题我们后面再分析。

    最后,我们总结一下设计思想:

    事实上用户所使用的 Observable ,都是 Producer 的子类和AnonymousObservable平行的子类,只不过用户不需要关心其具体实现罢了
    每一个类似AnonymousObservable的类,还有一个与之相关的类AnonymousObservableSink,Sink即管道,所有这些组合起来才能让其真正运行起来,AnonymousObservableSink同时拥有序列,订阅功能,类似于我们项目中经常用的manager角色。
    整个设计向上通过组合协议的方式描述其特性,向下通过子类化的方式屏蔽其实现细节,类似于工厂模式,这样的类也可以叫类簇。

    序列创建的流程

    通过上面类继承关系,其实我们不难理解序列的创建流程,它确实也是只有比较简单的几部,寥寥几行代码就搞定了,难点是上面抛出的几个问题:

    下面我们将通过一个简单的Rxswift的实例来分析一下序列的创建,订阅,销毁直接的流程和关系。

    实例1

        //1. 创建序列
       let ob = Observable<Any>.create { (obserber) -> Disposable in
                // 3:发送信号
                obserber.onNext("kyl发送了信号")
                obserber.onCompleted()
                return Disposables.create()
            }
        
            // 2:订阅信号
            let _ = ob.subscribe(onNext: { (text) in
                print("订阅到:\(text)")
            }, onError: { (error) in
                print("error: \(error)")
            }, onCompleted: {
                print("完成")
            }) {
                print("销毁")
            }
    

    上面实例1 的这段代码可以用酷C老师的一个图来清晰的表达:

    从上面的代码和关系图,我们可能会产生这样一个疑惑:

    问题3: 创建的ob序列,仅仅只是通过ob.subscribe()订阅了一下,为什么我们在ob创建时的尾随闭包(我们这里给个名字叫闭包A)里面调用了obserber.onNext("kyl发送了信号")这个代码,我们就可以订阅到let _ = ob.subscribe(onNext: { (text) in print("订阅到:\(text)") } 这里会打印:”订阅到:kyl发送了信号“。我们没有看见他们之间有任何关联,怎么ob发送消息,subcribe()的onNext闭包就可以触发呢,这是为什么呢?

    我们可以这里可以简单推理下:ob.subscribe()这个订阅方法肯定做了一些事情,在某个地方调用了闭包A,才能实现这个功能。具体是怎么样实现的呢?下面我们将通过分析源码来解答这个疑惑。

    从上面的代码我们可以知道,创建序列就一行代码:let ob = Observable<Any>.create { (obserber) -> 而这一行代码其实是做了好多事情的。

    首先我们通过一个流程图来初略的了解一下序列创建流程:

    创建序列的Rxswift原码很简单,从上图可以看出,直接一行代码return AnonymousObservable(subscribe)就结束了,这里我们们并没有找到我们需要的答案,甚至我们有点越来越晕感觉。

    • AnonymousObservable类源码
    final private class AnonymousObservable<Element>: Producer<Element> {
        typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    
        let _subscribeHandler: SubscribeHandler
    
        init(_ subscribeHandler: @escaping SubscribeHandler) {
            self._subscribeHandler = subscribeHandler
        }
    
        override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)
            return (sink: sink, subscription: subscription)
        }
    }
    

    我们先做个深呼吸,放轻松,此路不通那我们来尝试分析其他方向,不能在一棵树上吊死。下面我们来分析一下订阅的流程。

    2.订阅

    回顾上面实例1中的订阅代码:let _ = ob.subscribe(onNext: { (text) in这行代码又做了些什么事情呢?下面我们通过源码来深入分析一下:

    • Rxswift订阅subscribe()的源码如下:
      public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
            -> Disposable {
                
                ... 上面代码不是我们要分析的重点,...表示忽略了此次的一段源码
                /*注意,此次定义了一个AnonymousObserver()对象,以参数的形式,
                构造方法里面传入了一个尾随闭包eventHandler,
                在这个闭包里面,当收到event的不同事件,
                会触发并调用,我们 `let _ = ob.subscribe(onNext: { (text) in` 这个方法传入闭包
                */
                let observer = AnonymousObserver<E> { event in
              
                    ...
                    
                    switch event {
                    case .next(let value):
                        onNext?(value) //调用订阅时传入的ob.subscribe(onNext:闭包
                    case .error(let error):
                        if let onError = onError {
                            onError(error)//调用订阅时传入的ob.subscribe(onError:闭包
                        }
                        else {
                            Hooks.defaultErrorHandler(callStack, error)
                        }
                        disposable.dispose()
                    case .completed:
                        onCompleted?()//调用订阅时传入的ob.subscribe(onCompleted:闭包
                        disposable.dispose()
                    }
                }
                return Disposables.create(
                    self.asObservable().subscribe(observer),
                    disposable
                )/*这里直接返回了Disposables对象,用来释放资源,
                在它的构造函数里面直接调用了self.asObservable().subscribe(observer),
                而asObservable()就是我们创建的序列ob,也就是ob.subscribe(),
                并传入了,在这段代码里面创建的局部变量let observer = AnonymousObserver<E>,*/
        }
    

    通过上面源码我们可以知道:subscribe()这个方法,以参数的形式传入了onNext()闭包,onError()闭包,onComplete()闭包,在函数里面创建了一个AnonymousObserver对象observer,这个对象创建的时候传入了一个闭包,当收到不同event事件时,会分别调用我们subscribe()传入的onNext,onError,onComplete这三个闭包。最重要一点是return Disposables.create( self.asObservable().subscribe(observer), disposable )这句代码调用了我们真正的subscribe()函数,并以参数的形式传入了AnonymousObserver对象,self.asObservable()就是我们create()函数创建的序列ob, 而到此处我们可以清晰的看到,我们订阅时传入参数闭包和我们创建的ob建立了一个链条。

    这里我们又有一个疑问:self.asObservable()为什么就是我们create()函数返回的ob呢?

    要解答这个问题,我需要回顾一下上面分析的Observable类的继承关系:Observable -> ObservableType -> ObservableConvertibleType 即Observable继承ObservableType协议,ObservableType又继承ObservableConvertibleType协议,而我们的ObservableConvertibleType提供了抽象方法asObservable(),我们Observable类中实现了asObservable()这个方法,它直接返回self就它自己。

    下面通过源码来证实:

    ///
    /// It represents a push style sequence.
    public class Observable<Element> : ObservableType {
    
        ...
        
        public func asObservable() -> Observable<E> {
            return self
        }
        
        ...
    }
    

    分析了Rxswift订阅subscribe()的源码感觉非常nice, 我们找到了我们ob 创建时传入的闭包和我们订阅时的闭包存在了一条链条关系,也就是只要ob发送了消息,那我们的订阅者一定可以按照这个链条收到消息。但是我们还是不知道到底是怎么调用的,怎么触发的。

    而且我们注意到self.asObservable().subscribe(observer)也就是AnonymousObservable调用了subscribe()方法,但是在AnonymousObservable类中我们并没有找到subscribe()的定义,所以我们需要来看他的父类Producer

    • Producer的源码如下:
    class Producer<Element> : Observable<Element> {
        override init() {
            super.init()
        }
    
        override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // The returned disposable needs to release all references once it was disposed.
                let disposer = SinkDisposer()
                /*重点在这里了,这里调用了run()方法,一切疑惑都清晰了,我们知道了run()调用时传入了observer,并且创建了sink管子,而这个管子具备了序列的功能,可以调用on()方法。
                */
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
    
        func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            rxAbstractMethod()
        }
    }
    
    

    果然不出我们所料,在Producer中我们找到了subscribe()的方法定义,到此我们可以总结出很清晰的几条线索了

    • (1)通过前面的类继承关系可以知道是Producer实现了ObservableType协议的subscribe()方法。在这个方法里面调用了self.run(observer, cancel: disposer)
    • (2) self.run()实际上就是AnonymousObservable.run(), 这这个方法里面做了三件事情:
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
    //1.创建了一个sink管道对象,并将observer也就create()创建
    //序列时传入的闭包传给了sink
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            //2. sink调用自己的run()方法,并把AnonymousObservable作为参数传入。
            let subscription = sink.run(self)
            //返回一个元组,包含sink管道信息。
            return (sink: sink, subscription: subscription)
        }
    
    • (3)AnonymousObservableSink类中run()方法中调用parent._subscribeHandler(AnyObserver(self)) 其中parent就是我们(2)中sink.run(self)传入的self,也就是AnonymousObservable对象;并且我们前面已经知道_subscribeHandler就是创建序列时保存的那个通过create()函数参数传入的 闭包:let ob = Observable<Any>.create { (obserber) -> Disposable in // 3:发送信号 obserber.onNext("kyl发送了信号") obserber.onCompleted() return Disposables.create() }。 现在已经很清晰了parent._subscribeHandler(AnyObserver(self)) 执行闭包,这行代码就会调用obserber.onNext("kyl发送了信号")这个行代码。
    • 现在我们可以通过一个流程图来总结我们代码执行的流程:

    上面的订阅序列流程分析:我们弄明白了从订阅序列到调用create()函数时传入的参数闭包调用的逻辑,但是这个闭包发送onNext()信号后,怎么到订阅消息的onNext()闭包我们还不是很清晰。因此我们需要分析AnonymousObserver

    我们先来看下AnonymousObserver

    • AnonymousObserver源码定义如下:
    final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
        typealias Element = ElementType
        
        typealias EventHandler = (Event<Element>) -> Void
        
        private let _eventHandler : EventHandler
        
        /*构造函数,保存了EventHandler尾随闭包*/
        init(_ eventHandler: @escaping EventHandler) {
    #if TRACE_RESOURCES
            _ = Resources.incrementTotal()
    #endif
            self._eventHandler = eventHandler
        }
    
        //覆写了onCore方法,调用了EventHandler闭包
        override func onCore(_ event: Event<Element>) {
            return self._eventHandler(event)
        }
        
    #if TRACE_RESOURCES
        deinit {
            _ = Resources.decrementTotal()
        }
    #endif
    }
    
    

    AnonymousObserver源码中我们并没有找到onNext()方法,那我们只能沿着它的继承链往上查找,这里需要了解一下类的继承关系:

    • AnonymousObserver的继承关系:

    通过分析类的继承关系,我们得知:这样一个关系链:

    AnonymousObserver对象的on()方法会调用onCore()方法,ObserverType里面有onNext,onError,onComplete方法。但是on()是如何调用的,何时调用的呢?

    要解决这个疑问,我们需要再次回到我们创建序列的代码:

    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
            return AnonymousObservable(subscribe)
        }
    

    创建序列的create()方法传入了一个subscribe闭包,并返回了AnonymousObservable对象。其中subscribe闭包就是我们序列创建时参数形式传入 闭包。并且AnonymousObservable初始化时将这个闭包保存起来了self._subscribeHandler = subscribeHandler AnonymousObservable 有一个run()方法,run方法里面创建了一个AnonymousObservableSink对象sink,具体源码如下:

    final private class AnonymousObservable<Element>: Producer<Element> {
        typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    
        let _subscribeHandler: SubscribeHandler
    
        init(_ subscribeHandler: @escaping SubscribeHandler) {
            self._subscribeHandler = subscribeHandler
        }
    
        override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)
            return (sink: sink, subscription: subscription)
        }
    }
    

    分析了这么久,绕了一圈,终于发现关键就在AnonymousObservableSink管子这个对象里面了。sink这是个神奇的管子。它就保存了序列,也保存了订阅,还保存了用于销毁的disposed 也就是同时拥有了创建序列,订阅序列,销毁序列功能。

    我们来分析下AnonymousObservableSink的源码:

    final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
        typealias E = O.E
        //这里的Parent就是我们上面分析的AnonymousObservable,非常重要
        typealias Parent = AnonymousObservable<E>
    
        // state
        private let _isStopped = AtomicInt(0)
    
        #if DEBUG
            fileprivate let _synchronizationTracker = SynchronizationTracker()
        #endif
    
    // 构造方法,传入了observer序列,和Cancelable
        override init(observer: O, cancel: Cancelable) {
            super.init(observer: observer, cancel: cancel)
        }
    
    //这里实现 了ObserverType协议的on()方法
        func on(_ event: Event<E>) {
            #if DEBUG
                self._synchronizationTracker.register(synchronizationErrorMessage: .default)
                defer { self._synchronizationTracker.unregister() }
            #endif
            switch event {
            case .next:
                if load(self._isStopped) == 1 {
                    return
                }
                //调用了父类的发布,self.forwardOn()会调用自己的on()方法
                self.forwardOn(event)
            case .error, .completed:
                if fetchOr(self._isStopped, 1) == 0 {
                    self.forwardOn(event)
                    self.dispose()
                }
            }
        }
    
        func run(_ parent: Parent) -> Disposable {
        /*调用了_subscribeHandler闭包,这个闭包就是我们之前创建序列时传入闭包。
        parent就是传入进来的序列,这里序列的闭包里传入了self并且强转为AnyObserver
        这里将self传给了闭包_subscribeHandler,这样_subscribeHandler也就具备了subcribe的能力。
        */
            return parent._subscribeHandler(AnyObserver(self))
        }
    }
    

    其中Sink类的源码如下:

    class Sink<O : ObserverType> : Disposable {
        fileprivate let _observer: O
        fileprivate let _cancel: Cancelable
        fileprivate let _disposed = AtomicInt(0)
    
        #if DEBUG
            fileprivate let _synchronizationTracker = SynchronizationTracker()
        #endif
    
        init(observer: O, cancel: Cancelable) {
    #if TRACE_RESOURCES
            _ = Resources.incrementTotal()
    #endif
            self._observer = observer
            self._cancel = cancel
        }
    
        final func forwardOn(_ event: Event<O.E>) {
            #if DEBUG
                self._synchronizationTracker.register(synchronizationErrorMessage: .default)
                defer { self._synchronizationTracker.unregister() }
            #endif
            if isFlagSet(self._disposed, 1) {
                return
            }
            //这里调用了传入observer.on()方法,
            self._observer.on(event)
        }
    
        final func forwarder() -> SinkForward<O> {
            return SinkForward(forward: self)
        }
    
        final var disposed: Bool {
            return isFlagSet(self._disposed, 1)
        }
    
        func dispose() {
            fetchOr(self._disposed, 1)
            self._cancel.dispose()
        }
    
        deinit {
    #if TRACE_RESOURCES
           _ =  Resources.decrementTotal()
    #endif
        }
    }
    

    从源码分析我们得知:

    • 我们的sink保存了我们的序列,当我们调用ob.onNext()发送信号时,由于我们的sink已经持有了ob, 这样sink会调用on()方法,在on()方法里面会调用self.forwardOn(event),而在fowardOn()里面会调用self._observer.on(event)。这样我的疑问就解决了,答案就是sink调用了on()方法。

    • 这里我们再来总结一下总的流程:

    1. 创建序列时create()返回了一个ob, 这个ob就是序列,创建的时候传递了一个闭包A。在闭包A中调用了ob.onNext()发送了信号。
    2. 订阅序列时调用ob.subscribe()方法,这个方法会创建一个AnonymousObserver对象,并调用了self.asObservable().subscribe(observer)
    3. self.asObservable()实际就是我们的ob, 也就是ob调用了subscribe().而AnonymousObserver中没有找到subscribe()。
    4. 我们在AnonymousObserver的父类中找到了subscribe(),发现subscribe()调用了AnonymousObserver的run()方法。
    5. 在AnonymousObserver的run()方法中,创建了一个管子sink,并调用了sink.run(self),sink是AnonymousObservableSink的对象,而在sink的run()方法中parent._subscribeHandler(AnyObserver(self))调用了创建序列时保存的闭包A (parent就是AnonymousObserver),这样就解释了订阅时,回调了A闭包的原因。
    6. 至于怎么调用onNext()方法也是通过sink来实现的。
    7. sink已经持有了ob ,当我们在A闭包里面调用ob.onNext()发送信号时,实际会通过sink.on()来调用。首先sink.on()会调用forwardOn().
    8. 在forwardOn()中会调用self._observer.on(event)。
    9. _observer.on()会调用_observer.onCore()
    10. _observer.onCore(event)会根据event的类型判断是调用onNext(),onError(),onComplete()中间一个,由于我们传递的是onNext事件,所以会调用onNext() ,而这个_observer.onNext()会调用我们订阅时传入闭包subscribe(onNext:).
    11. 为什么回调的原因是:
    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
            -> Disposable {
                
                ... 上面代码不是我们要分析的重点,...表示忽略了此次的一段源码
                /*注意,此次定义了一个AnonymousObserver()对象,以参数的形式,
                构造方法里面传入了一个尾随闭包eventHandler,
                在这个闭包里面,当收到event的不同事件,
                会触发并调用,我们 `let _ = ob.subscribe(onNext: { (text) in` 这个方法传入闭包
                */
                let observer = AnonymousObserver<E> { event in
              
                    ...
                    
                    switch event {
                    case .next(let value):
                        onNext?(value) //调用订阅时传入的
    

    这里调用ob.subscribe()的时候,我们创建了AnonymousObserver和我们subscribe()传入的onNext()闭包做了一个绑定,当AnonymousObserver.onNext()调用的时候必定会回调subscribe()传入的onNext()闭包。而10中的_observer对象指的就是let observer = AnonymousObserver

    • 还是通过这张图来解释最简洁:

    3. 销毁

    RxSwift给我们的展示的设计思维

    iOS 常用设计模式

    相关文章

      网友评论

          本文标题:RxSwift (二)序列核心逻辑分析

          本文链接:https://www.haomeiwen.com/subject/vefljctx.html