透视RxSwift核心逻辑

作者: silasjs | 来源:发表于2019-07-21 03:23 被阅读286次

    透视RxSwift核心逻辑

    篇幅稍微有点长,了解程度不同,可以跳过某些部分。

    1. 如果对源码比较熟悉的,建议直接看图就行了,时序图更加清晰。第一次摸索有必要阅读文字内容。
    2. 贴出来的代码省略了不必要的部分,用省略号代替。

    示例

    RxSwift的基础用法就是很简单的几步

    1. 创建可观察序列
    2. 监听序列(订阅信号)
    3. 销毁序列
    //创建序列
    let ob = Observable<Any>.create { (observer) -> Disposable in
        //发送信号
        observer.onNext("今日份麻酱凉皮")
        observer.onCompleted()
    
        return Disposables.create()
    }
    //订阅信号
    let _ = ob.subscribe(onNext: { (text) in
        print("订阅到:\(text)")
    }, onError: { (error) in
        print(error)
    }, onCompleted: {
        print("完成")
    }) {
        print("销毁")
    }
    
    控制台输出:
    
    订阅到:今日份麻酱凉皮
    完成
    销毁
    

    探究

    在看源码之前,应该对接触到的类和协议有些认识,方便之后的理解。下面的关系图在需要的时候回头熟悉一下就行:

    类关系图.png

    到底是什么在支撑如此便捷的调用?

    第一句Observable<Any>.create创建了一个可观察序列Observable对象,第二句就是这个Observable序列对象订阅了消息。

    从输出可以看出,都是订阅到的消息。那么订阅时传入subscribe的闭包是什么时候调用的呢?

    单从现在的几句代码,也能猜出是第一句代码的闭包中的observer.onNext的调用引起的。但是,我们也没有看到这个create函数中的闭包是在哪里执行的?

    为了能够清晰的描述,暂且称第一句create中的闭包为create闭包,第二句subscribe中的几个闭包为subscribe闭包

    外面看不出来,那我们只能进去RxSwift里面探索下createsubscribe到底做了什么?

    create函数的实现

    extension ObservableType {
        public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
            return AnonymousObservable(subscribe)
        }
    }
    

    原来这函数内部实际上是创建了一个AnonymousObservable匿名可观察序列对象。而之前的闭包也是给AnonymousObservable对象初始化用了。

    AnonymousObservable

    final private class AnonymousObservable<Element>: Producer<Element> {
        typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    
        let _subscribeHandler: SubscribeHandler
    
        init(_ subscribeHandler: @escaping SubscribeHandler) {
            self._subscribeHandler = subscribeHandler
        }
    
        override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)
            return (sink: sink, subscription: subscription)
        }
    }
    

    这里在初始化的时候把create闭包赋值给了_subscribeHandler属性。

    到此为止,Observable<Any>.create函数实际上创建了一个AnonymousObservable匿名可观察序列对象,并保存了create闭包

    一图概千言1.png

    。。。貌似这条不是主线啊!没有找到任何一个问题的答案。

    再来翻翻subscribe函数

    extension ObservableType {
        public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
            -> Disposable {
                let disposable: Disposable
                
                ......
                
                let observer = AnonymousObserver<E> { event in
                    ......
                    
                    switch event {
                    case .next(let value):
                        onNext?(value)
                    case .error(let error):
                        if let onError = onError {
                            onError(error)
                        }
                        else {
                            Hooks.defaultErrorHandler(callStack, error)
                        }
                        disposable.dispose()
                    case .completed:
                        onCompleted?()
                        disposable.dispose()
                    }
                }
                return Disposables.create(
                    self.asObservable().subscribe(observer),
                    disposable
                )
        }
    }
    

    这也是对定义在ObservableType协议中的函数的实现,返回了一个Disposable。这个Disposable就是用来管理订阅的生命周期的,示例代码中并没有体现出来,实际是在订阅信号的内部处理的。前面都没什么,后面创建了AnonymousObserver,并且在AnonymousObserver初始化时传入了闭包,并赋值给_eventHandler属性。

    final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
        typealias Element = ElementType
        typealias EventHandler = (Event<Element>) -> Void
        
        private let _eventHandler : EventHandler
        
        init(_ eventHandler: @escaping EventHandler) {
            self._eventHandler = eventHandler
        }
    
        override func onCore(_ event: Event<Element>) {
            return self._eventHandler(event)
        }
    }
    

    之前,AnonymousObservable匿名序列对象,保存了create闭包
    此时,创建了AnonymousObserver匿名观察者对象,保存了对subscribe闭包的回调执行的EventHandler闭包

    又一条支线。。。一路走来,都是在创建对象,保存闭包。两个主线疑问还是无迹可寻。难道一开始就走上了歧路?非也!继续看下去就明白了什么叫「柳暗花明又一村」。

    AnonymousObservablesubscribe函数中,在创建了AnonymousObserver对象后,还返回了一个新建的Disposable对象。重点就在这里的第一个参数:self.asObservable().subscribe(observer)中。asObservable还是返回了self,后面贴上的ObserverType中可以看到。剩下的就是AnonymousObservable的父类Producer中的subscribe了:

    class Producer<Element> : Observable<Element> {
        override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                ......
            } else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
    }
    

    由于是在当前线程中执行的,只看else那部分。disposer相关的不用关心。关键是observer参数,这个参数中有对subscribe闭包的处理的EventHandler闭包observer传入了self.run(observer, cancel)。所以,还要回头再看看AnonymousObservable类中的run函数

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
    

    这里又创建了一个AnonymousObservableSink对象,observercancel继续往初始化函数中丢:

    final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
        typealias E = O.E
        typealias Parent = AnonymousObservable<E>
        // state
        private let _isStopped = AtomicInt(0)
        
        override init(observer: O, cancel: Cancelable) {
            super.init(observer: observer, cancel: cancel)
        }
        
        func on(_ event: Event<E>) {
            ......
            
            switch event {
            case .next:
                if load(self._isStopped) == 1 {
                    return
                }
                self.forwardOn(event)
            case .error, .completed:
                if fetchOr(self._isStopped, 1) == 0 {
                    self.forwardOn(event)
                    self.dispose()
                }
            }
        }
    
        func run(_ parent: Parent) -> Disposable {
            return parent._subscribeHandler(AnyObserver(self))
        }
    }
    

    前面sink.run(self)self传了进来,又对AnonymousObservable起了别名Parentparent._subscribeHandler不就是AnonymousObservable在调用它最开始保存的那个create闭包么?AnyObserver(self)则把AnonymousObservableSink作为AnyObserver初始化的参数。

    public struct AnyObserver<Element> : ObserverType {
        public typealias E = Element
        public typealias EventHandler = (Event<Element>) -> Void
    
        private let observer: EventHandler
    
        public init(eventHandler: @escaping EventHandler) {
            self.observer = eventHandler
        }
        
        public init<O : ObserverType>(_ observer: O) where O.E == Element {
            self.observer = observer.on
        }
        public func on(_ event: Event<Element>) {
            return self.observer(event)
        }
        public func asObserver() -> AnyObserver<E> {
            return self
        }
    }
    

    其中还把AnonymousObservableSinkon函数赋值给了AnyObserver的属性observerobserver就是EventHandler。这个EventHandlercreate闭包中会用到。

    这不就是第二个主线疑问(create函数中的闭包何时调用)的答案么!

    整理一下:

    • subscribe(onNext, onError, onCompleted, onDisposed) -> Disposable函数中创建Disposable开始
    • AnonymousObservable调用subscribe(observer)
    • AnonymousObservable调用run(observer, cancel)
    • 创建AnonymousObservableSink(observer: observer, cancel: cancel),并且sink.run(self)
    • parent._subscribeHandler(AnyObserver(self))

    这是一条从subscribe闭包-->create闭包的线。

    一图概千言2.png

    还没完,还有个create闭包中怎么触发subscribe闭包的?

    又臭又长的写了这么多,这里就只看observer.onNext("今日份麻酱凉皮")吧。点进去看:
    observer.onNext("今日份麻酱凉皮")

    extension ObserverType {
        public func onNext(_ element: E) {
            self.on(.next(element))
        }
        
        public func onCompleted() {
            self.on(.completed)
        }
        
        public func onError(_ error: Swift.Error) {
            self.on(.error(error))
        }
    }
    

    onNext中调用了on,在前面AnyObserver结构体定义中可以看出,on函数中返回了self.observer(event),之前是把AnonymousObservableSinkon赋值给了这个self.observer,所以,此时会走到AnonymousObservableSinkon函数中。这里面又调用了self.forwardOn(event),看下AnonymousObservableSink的父类Sink中定义的forwardOn

    class Sink<O : ObserverType> : Disposable {
        fileprivate let _observer: O
        fileprivate let _cancel: Cancelable
        fileprivate let _disposed = AtomicInt(0)
    
        ......
    
        init(observer: O, cancel: Cancelable) {
            ......
            
            self._observer = observer
            self._cancel = cancel
        }
    
        final func forwardOn(_ event: Event<O.E>) {
            ......
            
            self._observer.on(event)
        }
    }
    

    forwardOn中走了一句self._observer.on(event)。这里的_observer属性不就是AnonymousObservableSink初始化时传入的AnonymousObserver对象么!

    继续跟AnonymousObserveron

    final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
        typealias Element = ElementType
        typealias EventHandler = (Event<Element>) -> Void
        
        private let _eventHandler : EventHandler
        
        init(_ eventHandler: @escaping EventHandler) {
            ......
            self._eventHandler = eventHandler
        }
    
        override func onCore(_ event: Event<Element>) {
            return self._eventHandler(event)
        }
    }
    

    这里没有on,看父类ObserverBase:

    class ObserverBase<ElementType> : Disposable, ObserverType {
        typealias E = ElementType
        
        private let _isStopped = AtomicInt(0)
    
        func on(_ event: Event<E>) {
            switch event {
            case .next:
                if load(self._isStopped) == 0 {
                    self.onCore(event)
                }
            case .error, .completed:
                if fetchOr(self._isStopped, 1) == 0 {
                    self.onCore(event)
                }
            }
        }
    }
    

    这里的on函数中,有个.next分支,调用了self.onCore(event)。子类AnonymousObserver实现的onCore中又调用了self._eventHandler(event)

    这个_eventHandler是什么?不就是AnonymousObserver初始化时保存的对subscribe闭包处理的闭包么!所以create闭包中的observer.onNext("今日份麻酱凉皮")就能触发subscribe闭包了。

    这是一条从AnonymousObservable调用_subscribeHandler(也就是create闭包)时的参数 AnyObserver-->subscribe闭包的线。

    一图概千言3.png

    现在我们看清楚了响应式的数据流:

    1. 在订阅信号时创建了observer并执行创建序列时的闭包
    2. 在创建序列的闭包中有回调observer,监听序列的变动而触发订阅信号的闭包

    图解

    看清楚了么?好像清楚的不太明显。毕竟好几个类、协议,又那么多函数调来调去的。加把劲再撸一撸。既然这么五花八门的调用流程搞清楚了,那就来弄清楚它主要都做了什么?

    一图概千言,既然画了图,就少敲点键盘吧!

    RxSwift核心逻辑图解四.jpg

    仔细看了流程图就会发现,出现的几个类中,干活的主要是Anonymous开头的那三个类。我们在外面调用的操作,其实是在使用RxSwift内部封装的一些类。

    • 创建可观察序列AnonymousObservable,并保存create闭包
    • 订阅信号
      • 首先创建了一个AnonymousObserver,并且把对subscribe闭包的操作封装成了EventHandler
      • 苦力活还是AnonymousObservable来干。
        • 在创建返回值Disposable中,由subscribe(observer),把AnonymousObserver传给了AnonymousObservableSink
          • AnonymousObservableSink才是信息处理的核心,因为他知道的太多了
          • AnonymousObservableSinkAnonymousObserver观察者,AnonymousObserver持有着EventHandler
          • AnonymousObservableSink在调用run函数时也传入了AnonymousObservable序列,AnonymousObservable就是create闭包的持有者。
          • AnonymousObservableSink初始化的时候,除了观察者外,还有个管理序列生命周期的Disposable
        • AnonymousObservableSink作为一个内部类,在被create闭包当做参数回调给外界时需要转换为AnyObserver,在这里AnyObserver则是以闭包属性的形式保留了AnonymousObservableSinkon函数
        • 后面在信号发生改变时就可以让AnyObserver通过这个属性值联系到AnonymousObservableSink

    订阅信号:Observable-->AnonymousObservable-->AnonymousObserver-->AnonymousObservableSink-->AnyObserver-->create闭包

    • 发出信号
      • 这个过程基本就是和订阅信号时相反的
      • create闭包中调用AnyObserveronNext开始
      • 通过AnyObserver.observer访问闭包中的AnonymousObservableSink
      • AnonymousObservableSink拥有AnonymousObserver
      • AnonymousObserver掌控EventHandler
      • 句号

    发出信号:create闭包-->AnyObserver-->AnonymousObservableSink-->AnonymousObserver-->subscribe闭包

    相关文章

      网友评论

        本文标题:透视RxSwift核心逻辑

        本文链接:https://www.haomeiwen.com/subject/ktoplctx.html