美文网首页
RxSwift(三)原理深入探究

RxSwift(三)原理深入探究

作者: Colbert_Z | 来源:发表于2019-08-11 17:32 被阅读0次

    在上一篇:RxSwift原理-执行流程中,简单分析了一下RxSwift的核心流程,本篇主要针对上一篇做一下更加深入的探究。

    继续上篇的demo:

        let disposbag: DisposeBag = DisposeBag()
        // 1、创建序列
        let ob:Observable<Any> = Observable.create { (observer) -> Disposable in
            
            // 3、发送信号
            observer.onNext("text")
            observer.onCompleted()
            return Disposables.create()
        }
        
        override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?) {
            
            // 2、订阅信号
            ob.subscribe(onNext: { (text) in
                print(text)
            }, onCompleted:{
                print("订阅完成")
            }).disposed(by: disposbag)  // 4、销毁
        }
    

    流程个人总结如下图,一次创建可以多次订阅,多次响应。


    还是按照经典的三部曲来分析:创建序列,订阅信号,发送信号。

    创建序列

    Observable.create:

        public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
            return AnonymousObservable(subscribe)
        }
    
    final private class AnonymousObservable<Element>: Producer<Element> {
        typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    
        let _subscribeHandler: SubscribeHandler
    
        init(_ subscribeHandler: @escaping SubscribeHandler) {
            self._subscribeHandler = subscribeHandler
        }
    }
    

    来一张创建Observable所涉及类的继承关系图:


    总结如下:

    • create方法创建AnonymousObservable对象,传入订阅执行的闭包
    • AnonymousObservable对象使用_subscribeHandler保存了订阅执行的闭包

    订阅信号

    ob.subscribe:

    // '......'表示省略了一些可略过的代码
        public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
            -> Disposable {
                let disposable: Disposable
                ......
                let observer = AnonymousObserver<E> { event in
                    ......
                    switch event {
                    case .next(let value):
                        onNext?(value)
                    case .error(let error):
                        if let onError = onError {
                            onError(error)
                        }
                        else {
                            Hooks.defaultErrorHandler(callStack, error)
                        }
                        disposable.dispose()
                    case .completed:
                        onCompleted?()
                        disposable.dispose()
                    }
                }
                return Disposables.create(
                    self.asObservable().subscribe(observer),
                    disposable
                )
        }
    }
    
    final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
        typealias Element = ElementType
        
        typealias EventHandler = (Event<Element>) -> Void
        
        private let _eventHandler : EventHandler
        
        init(_ eventHandler: @escaping EventHandler) {
            self._eventHandler = eventHandler
        }
    }
    

    可以看到,subscribe中创建了一个匿名观察者AnonymousObserver,匿名观察者AnonymousObserver保存了一个响应事件的闭包_eventHandler,咱们来看看AnonymousObserver类的继承关系图:

    在创建匿名观察者时有这样一句代码:self.asObservable().subscribe(observer)。这里的self就是咱们上面创建的AnonymousObservable对象obobserverAnonymousObserver对象,于是可以替换成ob.subscribe(observer)。根据AnonymousObservable的继承关系,在Producer类中找到了subscribe()的实现,相当于执行:Producer.subscribe(AnonymousObserver)

    class Producer<Element> : Observable<Element> {
    
        override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // The returned disposable needs to release all references once it was disposed.
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
    }
    

    在上面的代码中,self.run(observer, cancel: disposer)是关键,其他的暂时可忽略。self还是咱们的对象ob,于是接着调用AnonymousObservablerun 方法。

        override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)
            return (sink: sink, subscription: subscription)
        }
    

    run 方法中创建了一个重要角色:AnonymousObservableSink 管道。
    创建 sink对象(AnonymousObservableSink(observer: observer, cancel: cancel)):

        override init(observer: O, cancel: Cancelable) {
            super.init(observer: observer, cancel: cancel)
        }
    

    AnonymousObservableSink的父类Sink

        init(observer: O, cancel: Cancelable) {
            self._observer = observer
            self._cancel = cancel
        }
    

    于是sink对象保存了刚刚创建的匿名观察者observerProducer.subscribe(AnonymousObserver)中创建的销毁者disposer

    sink.run(self):

        func run(_ parent: Parent) -> Disposable {
            return parent._subscribeHandler(AnyObserver(self))
        }
    

    AnyObserver(self):

        public init<O : ObserverType>(_ observer: O) where O.E == Element {
            self.observer = observer.on
        }
    

    上述代码表明先将sink转换成AnyObserver类型,AnyObserversinkon方法保存起来,于是return parent._subscribeHandler(AnyObserver(self))相当于ob执行_subscribeHandler闭包返回一个Disposable

    发送信号

    observer.onNext("text")

    extension ObserverType {
        public func onNext(_ element: E) {
            self.on(.next(element))
        }
    }
    
    public struct AnyObserver<Element> : ObserverType {
        public func on(_ event: Event<Element>) {
            return self.observer(event)
        }
    }
    

    这里的observer(event)就是我们上面保存的sinkon方法,也就是AnonymousObservableSinkon方法。

    final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
        func on(_ event: Event<E>) {
            #if DEBUG
                self._synchronizationTracker.register(synchronizationErrorMessage: .default)
                defer { self._synchronizationTracker.unregister() }
            #endif
            switch event {
            case .next:
                if load(self._isStopped) == 1 {
                    return
                }
                self.forwardOn(event)
            case .error, .completed:
                if fetchOr(self._isStopped, 1) == 0 {
                    self.forwardOn(event)
                    self.dispose()
                }
            }
        }
    }
    
    class Sink<O : ObserverType> : Disposable {
        final func forwardOn(_ event: Event<O.E>) {
            #if DEBUG
                self._synchronizationTracker.register(synchronizationErrorMessage: .default)
                defer { self._synchronizationTracker.unregister() }
            #endif
            if isFlagSet(self._disposed, 1) {
                return
            }
            self._observer.on(event)
        }
    }
    

    self._observer.on(event)即为执行上面sink保存的匿名observeron事件方法,也就是AnonymousObserveron方法(AnonymousObserveron没有实现,执行父类ObserverBaseon)。

    class ObserverBase<ElementType> : Disposable, ObserverType {
        func on(_ event: Event<E>) {
            switch event {
            case .next:
                if load(self._isStopped) == 0 {
                    self.onCore(event)
                }
            case .error, .completed:
                if fetchOr(self._isStopped, 1) == 0 {
                    self.onCore(event)
                }
            }
        }
        func onCore(_ event: Event<E>) {
            rxAbstractMethod()
        }
    }
    
    final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
        override func onCore(_ event: Event<Element>) {
            return self._eventHandler(event)
        }
    }
    

    self._eventHandler(event)即为执行我们之前订阅信号时保存的响应事件闭包。

    sink在RxSwift中类似于一个管理者角色,管理序列、观察者和销毁者,将序列发送至观察者,并管理销毁者适时消耗序列,回收资源。

    最后一张图作为总结:


    相关文章

      网友评论

          本文标题:RxSwift(三)原理深入探究

          本文链接:https://www.haomeiwen.com/subject/lpjidctx.html