美文网首页
RxSwift_核心原理

RxSwift_核心原理

作者: LeeWkai | 来源:发表于2019-08-16 10:45 被阅读0次

    基本流程

    1. 创建序列
    2. 订阅序列
    3. 发送信号
           // 1:创建序列
            let observable = Observable<Any>.create { (obserber) -> Disposable in
                // 3:发送信号
                obserber.onNext("发送信号")
                obserber.onCompleted()
                return Disposables.create()
            }
            
            // 2:订阅信号
            _ = observable.subscribe { (event) in
                print(event)
            }
    

    带着问题去思考底层的实现

    屏幕快照 2019-08-15 下午4.33.45.png

    涉及到的几个主要的类的继承关系

    屏幕快照 2019-08-15 下午4.37.20.png

    流程分析

    RxSwift核心流程.png

    源码分析

    啰说一句

    1. RxSwift中经常会用父类声明方法,子类extension重写,传入当前子类的数据OC里没这么玩的
    2. 看到的subscribe不一定是当前类中的方法,也可能调的父类,父父类,父父父类已经习惯了的当我没说

    当前流程所在文件Create.swift

    • create 方法的时候创建了一个内部对象 AnonymousObservable
    • AnonymousObservable(匿名序列) 保存了外界的闭包
    extension ObservableType {
        public func subscribe(onNext: ((E) -> Void)? = nil, ...) -> Disposable {
        .
        .
        .
                let observer = AnonymousObserver<E> { event in                
                    switch event {
                    case .next(let value):
                        onNext?(value)
                    case .error(let error):
                        if let onError = onError {
                            onError(error)
                        }
                        else {
                            Hooks.defaultErrorHandler(callStack, error)
                        }
                        disposable.dispose()
                    case .completed:
                        onCompleted?()
                        disposable.dispose()
                    }
                }
                return Disposables.create(
                    self.asObservable().subscribe(observer),
                    disposable
                )
        }
    }
    

    中间的流程是最后的外界闭包调用还没到

    • 创建了一个 AnonymousObserver (匿名内部观察者) 保存了外界的 onNext, onError , onCompleted , onDisposed

    Disposables.create()是RxSwift的自己的销毁机制,不用管,先看内部

    • self.asObservable().subscribe(observer),就是前文啰说的地方,调用了父类的subscribe,调用了父类Producer的subscribe()

    当前流程所在文件Producer.swift

    override func subscribe(_ observer: O) -> Disposable where O.E == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                ·
                ·
                ·
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                  ·
                  ·
                  ·
                    return disposer
                }
            }
        }
    
    • CurrentThreadScheduler.instance.schedule(())是RxSwift中的线程管理,不议

    • 关键代码self.run(observer, cancel: disposer)由于子类AnonymousObservable重写了run()又会回到子类重写的方法中

    • 极度恶心的结构

    当前流程所在文件Create.swift

    final private class AnonymousObservable<Element>: Producer<Element> {
        typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
     ·
     ·
     ·
        override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)
            return (sink: sink, subscription: subscription)
        }
    }
    
    final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
        ·
        ·
        ·
    
        func run(_ parent: Parent) -> Disposable {
            return parent._subscribeHandler(AnyObserver(self))
        }
    }
    

    parent 是传过来的AnonymousObservable对象,AnonymousObservable._subscribeHandler()完成了对外部生成序列时代码块的调用 create和subscribe怎么关联的问题
    然后去执行 发送响应,回到最外部 3:送onNext()信号

    let observable = Observable<Any>.create { (obserber) -> Disposable in
                // 3:发送信号
                obserber.onNext("发送信号")
                obserber.onCompleted()
                return Disposables.create()
            }
    

    当前流程所在文件Create.swift 再次回来

    final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
        ·
        ·
        ·
    
        func run(_ parent: Parent) -> Disposable {
            return parent._subscribeHandler(AnyObserver(self))
        }
    }
    
    • AnyObserver(self)把AnonymousObservableSink传入

    当前流程所在文件AnyObserver

    public struct AnyObserver<Element> : ObserverType {
        ·
        ·
        /// Construct an instance whose `on(event)` calls `observer.on(event)`
        ///
        /// - parameter observer: Observer that receives sequence events.
        public init<O : ObserverType>(_ observer: O) where O.E == Element {
            self.observer = observer.on
        }
        ·
        ·
    }
    
    • self.observer 保存了observer.on翻译一下
    • self.observer = AnonymousObservableSink.on

    当obserber.onNext("发送信号")就会找到AnyObserver父类

    let observable = Observable<Any>.create { (obserber) -> Disposable in
                // 3:发送信号
                obserber.onNext("发送信号")
                obserber.onCompleted()
                return Disposables.create()
            }
    

    当前流程所在文件ObserverType.swift

    public protocol ObserverType {
        associatedtype E
        func on(_ event: Event<E>)
    }
    extension ObserverType {
        public func onNext(_ element: E) {
            self.on(.next(element))
        }
        public func onCompleted() {
            self.on(.completed)
        }
        public func onError(_ error: Swift.Error) {
            self.on(.error(error))
        }
    }
    
    • 前边提到的,self.observer 初始化就是AnonymousObservableSink .on()
    • 最终变成了self.observer(event) -> AnonymousObservableSink .on(event)
    • 再次回到AnonymousObservableSink中

    当前流程所在文件Create.swift

    class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
        func on(_ event: Event<E>) {
            switch event {
            case .next:
                if load(self._isStopped) == 1 {
                    return
                }
                self.forwardOn(event)
            case .error, .completed:
                if fetchOr(self._isStopped, 1) == 0 {
                    self.forwardOn(event)
                    self.dispose()
                }
            }
        }
    }
    
    • self.forwardOn(event)做事件调用
    • 然而forwardOn又是AnonymousObservableSink父类Sink中的方法,又要找父类

    当前流程所在文件Sink.swift

    class Sink<O : ObserverType> : Disposable {
        ·
        ·
        final func forwardOn(_ event: Event<O.E>) {
            if isFlagSet(self._disposed, 1) {
                return
            }
            self._observer.on(event)
        }
    }
    
    • self._observer 又双叒叕 保存的是AnonymousObserver
      -又双叒叕 回到AnonymousObserver的on()

    当前流程所在文件Create.swift

    let observer = AnonymousObserver<E> { event in
        switch event {
        case .next(let value):
            onNext?(value)
        case .error(let error):
            if let onError = onError {
                onError(error)
            }
            else {
                Hooks.defaultErrorHandler(callStack, error)
            }
            disposable.dispose()
        case .completed:
            onCompleted?()
            disposable.dispose()
        }
    }
    
    • 最后,判断 event 调用 onNext?(value) ,消息发送结束,收到订阅的消息
      // 2:订阅信号
            _ = observable.subscribe { (event) in
                print(event)
            }
    

    相关文章

      网友评论

          本文标题:RxSwift_核心原理

          本文链接:https://www.haomeiwen.com/subject/ygcfsctx.html