美文网首页
解析RxSwift核心流程

解析RxSwift核心流程

作者: 瀚_ | 来源:发表于2019-07-30 17:23 被阅读0次

    RxSwift的核心流程可以简化为三个步骤:

    • 创建序列
    • 订阅序列
    • 发送信号
    // 创建序列
    Observable<Int>.create { (anyObserver) -> Disposable in
        // 发送信号
        anyObserver.onNext(2)
        return Disposables.create()
        }
        // 订阅序列
        .subscribe(onNext: { (element) in
            print("订阅到: \(element)")
        })
        .disposed(by: DisposeBag())
    

    在执行这行代码得到的结果是: 订阅到: 2 , 那么在RxSwift内部是在什么时候开始发送信号(其实就是create(_ subscribe:)中的 subscribe闭包 什么时候执行),又是什么时候订阅到结果(就是subscribe(onNext:)中的 onNext闭包 什么时候执行)。

    创建序列

    因为 Observable 继承于 ObservableType, 所以点击 create 方法可以看到 ObservableType 的扩展方法

    extension ObservableType {
        public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
            return AnonymousObservable(subscribe)
        }
    }
    

    create 方法中,其实是初始化了 AnonymousObservable 类的对象,而在初始化的时候保存了 SubscribeHandler 闭包。

    订阅序列

    • subscribe(onNext: onError: onCompleted: onDisposed:) 方法也是 ObservableType 的扩展方法,实现如下(忽略不关心的代码):
    public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
            -> Disposable {
    
                let observer = AnonymousObserver<Element> { event in
                    
                    switch event {
                    case .next(let value):
                        onNext?(value)
                    case .error(let error):
                        if let onError = onError {
                            onError(error)
                        }
                        else {
                            Hooks.defaultErrorHandler(callStack, error)
                        }
                        disposable.dispose()
                    case .completed:
                        onCompleted?()
                        disposable.dispose()
                    }
                }
                return Disposables.create(
                    self.asObservable().subscribe(observer),
                    disposable
                )
        }
    
    • 在这个方法中我们关注两个方法:

      • 初始化 AnonymousObserver(匿名观察者) 对象 observer,保存了 EventHandler 闭包
      • self.asObservable() 调用了 subscribe(observer)
    • 在前面创建序列分析 create 方法时知道这里的 self.asObservable() 其实就是 AnonymousObservable() 对象,那么self.asObservable().subscribe(observer) => AnonymousObservable().subscribe(observer)

    • 由于 AnonymousObservable 继承于 Producer, 在 Producer 类中找到 subscribe(observer)的实现如下:

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // 省略部分代码
                ...
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    // 省略部分代码
                    ...
                    return disposer
                }
            }
        }
    
    • subscribe(_ observer:) 方法中其实调用的就是 self.run(observer, ...), run(observer, ...) 的具体实现是在 AnonymousObservable 中:
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)
            return (sink: sink, subscription: subscription)
        }
    
    • 我们可以看到在这里初始化了 AnonymousObservableSink 类的对象,保存在外面创建的属于AnonymousObserver 类的 observer 对象。
    • sink.run 中的实现为:
    typealias Parent = AnonymousObservable<Element>
    func run(_ parent: Parent) -> Disposable {
            return parent._subscribeHandler(AnyObserver(self))
    }
    
    • 这里的 parent 就是 AnonymousObservable()对象
    • 到此前面 AnonymousObservable(subscribe) 保存的那份闭包会开始执行,也就是开始发送信号。
    • 捋一下整个流程为:
      其中 let ob = AnonymousObservable(subscribe)
      let sink = AnonymousObservableSink(observer: observer, ...),
      let observer = AnonymousObserver(event)

      self.asObservable().subscribe(observer) -> ob.subscribe(observer) -> ob.run(observer, ...) -> sink.run(ob) -> ob._subscribeHandler(AnyObserver(sink))

    发送信号

    • 在前面执行 AnonymousObservable(subscribe)._subscribeHandler(AnyObserver(sink)) 中传递 AnyObserver() 对象出去。
    public struct AnyObserver<Element> : ObserverType {
        
        public typealias EventHandler = (Event<Element>) -> Void
        private let observer: EventHandler
        ...
        public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
            self.observer = observer.on
        }
        
         public func on(_ event: Event<Element>) {
            return self.observer(event)
        }    
        ...
    }
    
    • 从初始化方法中可以看到 AnyObserver(sink) 保存的 sink.on的闭包。
    • 当执行 anyObserver.onNext(2) 时,因为 AnyObserverObserverType的类型,所以会走到 ObserverType 的扩展方法
    extension ObserverType {
        public func onNext(_ element: Element) {
            self.on(.next(element))
        }    
        ...
      }
    
    • 继续走到 AnyObserveron(_ event:) 方法,传递 .next 事件, 从上面的 on(_ event:) 方法中看到它会执行之前保存的闭包,因为我们之前保存的是 sink.on 闭包,所以最终会走到 sink.on 方法里:
    func on(_ event: Event<Element>) {
        ...
        switch event {
        case .next:
            if load(self._isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
    
    • 来到这里会找到父类的 forwardOn(_ event:) 方法:
    final func forwardOn(_ event: Event<Observer.Element>) {
        ...
        self._observer.on(event)
    }
    
    • 还记得之前在创建 AnonymousObservableSink 对象时,保存了 AnonymousObserver 对象吗? self._observer.on(event) 执行 AnonymousObserver 的父类 ObserverBaseon(event)方法:
    func on(_ event: Event<Element>) {
        switch event {
        case .next:
            if load(self._isStopped) == 0 {
                self.onCore(event)
            }
        case .error, .completed:
            if fetchOr(self._isStopped, 1) == 0 {
                self.onCore(event)
            }
        }
    }
    
    • onCore(_ event:) 方法:
    override func onCore(_ event: Event<Element>) {
        return self._eventHandler(event)
    }
    
    • 执行了在创建 AnonymousObserver 对象时保存的闭包。

    • 到这里我们也继续捋一下整个流程:
      let observer = AnonymousObserver(_ event:)

    anyObserver.onNext(2) -> anyObserver.on(.next(value)) -> sink.on(.next(value)) -> sink.forwardOn(.next(value)) -> sink._observer.on(.next(value)) -> observer.on(.next(value)) -> observer.onCore(.next(value)) -> observer._eventHandler(.next(value)) -> onNext?(value)

    最后附上一张相关类继承图以帮助分析
    RxSwift.png

    相关文章

      网友评论

          本文标题:解析RxSwift核心流程

          本文链接:https://www.haomeiwen.com/subject/xmanrctx.html