美文网首页
RxSwift - 核心逻辑 序列的创建与订阅

RxSwift - 核心逻辑 序列的创建与订阅

作者: 恍然如梦_b700 | 来源:发表于2021-04-21 01:58 被阅读0次

    今天梳理一下RxSwift核心逻辑
    想要理解RxSwift核心逻辑首先要熟悉swift语言的基本用法
    RxSwift使用函数式编程思想,一些基本使用我这里就不赘述了,大家可以看一下github上的Demo,仿写一下
    我们以下面的代码来展开

    //创建序列
           let ob = Observable<String>.create { (obserber) -> Disposable in
                obserber.onNext("帅的一匹")
                obserber.onCompleted()
                return Disposables.create()
            }
            //订阅
            let _ = ob.subscribe(onNext: { (text) in
                print("订阅到:\(text)")
            }, onError: { (error) in
                print("error: \(error)")
            }, onCompleted: {
                print("完成")
            }) {
                print("销毁")
            }
    

    首先进入到create方法里

    /**
             Creates an observable sequence from a specified subscribe method implementation.
        
             - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
        
             - parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
             - returns: The observable sequence with the specified implementation for the `subscribe` method.
             */
        public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> RxSwift.Disposable) -> RxSwift.Observable<Self.E>
    

    继续点进定义,发现点不进去,那我们就看一下注释被。从指定的订阅方法实现中创建一个可观察序列。http://reactivex.io/documentation/operators/create.html这里就是通过一层一层的路由找到create文件

    image.png

    返回一个AnonymousObservable对象


    image.png

    逃逸闭包不理解可以翻看我之前的闭包的相关文章
    这里AnonymousObservable包存了闭包也就是我们create传入的闭包,先做一个标记

    AnonymousObservable 的继承关系是什么样的呢
    AnonymousObservable<Element> 继承自 Producer<Element> ,Produce继承自 Observable<Element> , Observable 遵循 ObservableType : ObservableConvertibleType协议
    我们从ObservableConvertibleType往下捋

    public protocol ObservableConvertibleType {
        /// Type of elements in sequence.
        associatedtype E
    
        /// Converts `self` to `Observable` sequence.
        ///
        /// - returns: Observable sequence that represents `self`.
        func asObservable() -> Observable<E>
    }
    

    ObservableConvertibleType 首先有个关联类型E,然后有个函数asObservable声明
    ObservableType协议:

    public protocol ObservableType : ObservableConvertibleType {
        func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
    }
    extension ObservableType {
        
        /// Default implementation of converting `ObservableType` to `Observable`.
        public func asObservable() -> Observable<E> {
            // temporary workaround
            //return Observable.create(subscribe: self.subscribe)
            return Observable.create { o in
                return self.subscribe(o)
            }
        }
    }
    
    

    ObservableType声明了一个函数subscribe
    asObservable默认实现,这就有点类似于 as 的用法,也就意味着任何遵循这个协议的对象或者结构体都可以转换为Observable<E>

    create做的事情也就是保存了闭包,那么我们再看看对可观察序列subscribe时都做了什么:

    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
            -> Disposable {
                let disposable: Disposable
                
                if let disposed = onDisposed {
                    disposable = Disposables.create(with: disposed)
                }
                else {
                    disposable = Disposables.create()
                }
                
                #if DEBUG
                    let synchronizationTracker = SynchronizationTracker()
                #endif
                
                let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
                
                let observer = AnonymousObserver<E> { event in
                    
                    #if DEBUG
                        synchronizationTracker.register(synchronizationErrorMessage: .default)
                        defer { synchronizationTracker.unregister() }
                    #endif
                    
                    switch event {
                    case .next(let value):
                        onNext?(value)
                    case .error(let error):
                        if let onError = onError {
                            onError(error)
                        }
                        else {
                            Hooks.defaultErrorHandler(callStack, error)
                        }
                        disposable.dispose()
                    case .completed:
                        onCompleted?()
                        disposable.dispose()
                    }
                }
                return Disposables.create(
                    self.asObservable().subscribe(observer),
                    disposable
                )
        }
    

    首先创建了一个AnonymousObserver<E>类型的对象observer,这个泛型E也就是示例代码 let ob = Observable<String>.create...中的 String类型,我们看最后retrun, self.asObservable().subscribe(observer)中做了什么,此时self是AnonymousObservable类型的对象,所以找AnonymousObservable中的subscribe函数,但是里面并没有实现,而是在父类Producer中实现了

    class Producer<Element> : Observable<Element> {
        override init() {
            super.init()
        }
    
        override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // The returned disposable needs to release all references once it was disposed.
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
    
        func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            rxAbstractMethod()
        }
    }
    

    为什么这么做呢,其实也可以理解,父类这里处于一个OB视角,拥有统一的订阅能力,线程调度能力,销毁能力。具有公平原则。
    调用self.run(observer, cancel: disposer)就会来到AnonymousObservable的run函数

    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)
            return (sink: sink, subscription: subscription)
        }
    

    AnonymousObservableSink是一个管子,看看他的结构(非常重要)


    image.png
    image.png

    我们看下面这张图,这个管理既包含了观察者,又包含了销毁者,同时sink.run将self传了进去,self也就是当前的可观察序列,所以这个管子包含了就将观察者和序列关联了起来
    sink : 订阅者 + 销毁者 + 序列 + 调度环境,流通序列与订阅者,处理业务逻辑。


    image.png

    我们再看sink.run


    image.png image.png

    可以看到,AnyObserver保存了可观察序列的on函数


    image.png

    从上面代码可以看出,当实例代码调用onNext的时候就会来到管子里的on函数:


    image.png
    又会调到sink的forwardOn
    image.png
    又会调用_observer.on, 前面我们说过 _observer不就是之前传进来的let observer = AnonymousObserver<E>这个东西吗?
    image.png

    通过模式匹配最终调用到我们传进来的闭包onNext,将value传递给外部


    image.png

    我们再来看看subscribe(event)的时候又是什么流程呢?流程也差不多,我把调用流程标注出来,你可以按照这个流程调试一下

     public func subscribe(_ on: @escaping (Event<E>) -> Void)
            -> Disposable {
                let observer = AnonymousObserver { e in
                    on(e)
                }
                return self.asObservable().subscribe(observer)
        }
    
    image.png image.png image.png image.png
    image.png

    所以这一系列流程并不是我们想像中的那么简单,其实如果看的框架比较多了,你自然会发现很多框架的思维都是类似的,所以好的框架是值得我们去好好研究与借鉴的。

    相关文章

      网友评论

          本文标题:RxSwift - 核心逻辑 序列的创建与订阅

          本文链接:https://www.haomeiwen.com/subject/qrtalltx.html