美文网首页
Rxswift核心逻辑之sink

Rxswift核心逻辑之sink

作者: Emma_虫儿 | 来源:发表于2019-08-13 20:12 被阅读0次

    你好,我是Emma。今天我们来追踪sink,查看一下信号具体是如何让观察这进行on(event)操作的。

    RX基本订阅的使用方法:

    let ob = Observable<Any>.create { (obserber) -> Disposable in
        // 发送信号
        obserber.onNext("111")
        obserber.onNext("222")
        obserber.onCompleted()
        obserber.onError(NSError.init(domain: "error", code: 10087, userInfo: nil))
        return Disposables.create()
    }
    
    // 订阅信号
    let _ = ob.subscribe(onNext: { (text) in
        print("订阅到:\(text)")
    }, onError: { (error) in
        print("error: \(error)")
    }, onCompleted: {
        print("完成")
    }) {
        print("销毁")
    }
    

    1.创建序列和观察者,订阅事件,销毁者之间的关联

    我们订阅创建AnonymousObservable的内部类,传递的是一个逃逸闭包subscribe,和匿名函数用法相似。create方法的返回值是一个任意类型的序列。这个逃逸闭包我的解释是参数是任意类型的观察者,返回的是可以销毁的序列。

    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {  
        return AnonymousObservable(subscribe)  
    }  
    

    AnonymousObservable函数的继承关系链:

    2.进入AnonymousObservable这个内部类。

    具体分析:AnonymousObservable

    1. 存储了订阅者
    2. 构建了订阅者和AnyObserver的关联
    3. 执行run方法
    

    走进run方法:

    1.AnonymousObservableSink继承自Sink,保存了观察者和销毁者。
    2.AnonymousObservableSink.run(AnonymousObservable)意思是用run方法将AnonymousObservableSink和AnonymousObservable关联。
    3.run方法具体操作是:
    AnonymousObservable._subscribeHandler((AnyObserver(AnonymousObservableSink)))_subscribeHandler是在AnonymousObservable中构建的一个逃逸闭包。
    AnyObserver就是继承自ObserverType的结构体,这是个功能性的结构体,是一个U型钩子。
        - 有发送发送给观察者方法,
        - 规范观察者的方法,
        - 构建了`on(event)`调用`observer.on(event)的实例`,
        - 以及`on(event)`调用`eventhandler(event)`的实例。
    
    final private class AnonymousObservable<Element>: Producer<Element> {
        typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    
        let _subscribeHandler: SubscribeHandler
    
        init(_ subscribeHandler: @escaping SubscribeHandler) {
            self._subscribeHandler = subscribeHandler
        }
    
        override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)
            return (sink: sink, subscription: subscription)
        }
    }
    
        func run(_ parent: Parent) -> Disposable {
            return parent._subscribeHandler(AnyObserver(self))
        }
    

    3.AnyObserver结构体的理解

    AnyObserver是一个结构体这个结构体在我的理解中就是一个U型钩子,为什么要创建这么一个结构体呢,我们直接在AnonymousObservableSink中写出?欢迎给出宝贵的见解。
    具体端点步骤如下:
    1.通过AnyObserver初始化将观察者添加一个on(event)属性。

    /// Construct an instance whose `on(event)` calls `observer.on(event)`  
    ///  
    /// - parameter observer: Observer that receives sequence events.  
    public init<O : ObserverType>(_ observer: O) where O.E == Element {  
        self.observer = observer.on  
    }  
    

    那个这个事件是什么呢?这个事件就是闭包中的这些信息。并且首先是onNext方法。

    let ob = Observable<Any>.create { (obserber) -> Disposable in  
        // 发送信号  
        obserber.onNext("111")  
        obserber.onCompleted()  
        return Disposables.create()  
    }  
    
    /// Send `event` to this observer.  
    ///  
    /// - parameter event: Event instance.  
    public func on(_ event: Event<Element>) {  
        return self.observer(event)  
    }  
    

    //发送“111”给观察者,此时的观察者是什么呢?此时的观察者是管子中保存的那个观察者。

    
    class Sink<O : ObserverType> : Disposable {
    //这个类还保存了观察者和销毁者
    ...
    final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        然后走到管子中的这个方法。
        self._observer.on(event)
    }
     ...   
    }
    

    通过观察者调用到外部那个ob.subscribe中的闭包。

    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
    ...   
            let observer = AnonymousObserver<E> { event in
                
                #if DEBUG
                    synchronizationTracker.register(synchronizationErrorMessage: .default)
                    defer { synchronizationTracker.unregister() }
                #endif
                
                switch event {
                case .next(let value):
                    onNext?(value)
                    //这个地方的value,从.next中传到onNext中。
                case .error(let error):
                    if let onError = onError {
                        onError(error)
                    }
                    else {
                        Hooks.defaultErrorHandler(callStack, error)
                    }
                    disposable.dispose()
                case .completed:
                    onCompleted?()
                    disposable.dispose()
                }
            }
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
    }
    
    

    订阅到调用到上面提到的订阅实现。将实现中的值value形参相当于text,然后调用sink中的forwardOn方法。

    let _ = ob.subscribe(onNext: { (text) in
    //订阅到的信息传导这个text中。
        print("订阅到:\(text)")
    }, onError: { (error) in
        print("error: \(error)")
    }, onCompleted: {
        print("完成")
    }) {
        print("销毁")
    }
    
    
    class Sink<O : ObserverType> : Disposable {
    //这个类还保存了观察者和销毁者,继承了ObserverType协议,ObserverType协议的扩展方法进行onNext和on方法的关联。Sink进行了观察者和on方法之间的关联。
    ...
    final func forwardOn(_ event: Event<O.E>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        if isFlagSet(self._disposed, 1) {
            return
        }
        然后走到管子中的这个方法。
        self._observer.on(event)
    }
     ...   
    }
    
    0x10389770c <+508>: callq  0x103897720               ; 
    $defer #1 <A where A: RxSwift.ObserverType>() -> () in RxSwift.(AnonymousObservableSink in _95EBF5692819D58425EC2DD0512D115A).on(RxSwift.Event<A.E>) -> ()
    

    那么on方法时如何来让观察者和事件关联的,见下面的操作

    /// Send `event` to this observer.
    ///
    /// - parameter event: Event instance.
    public func on(_ event: Event<Element>) {
        return self.observer(event)
    }
    

    那这个onNext方法为什么可以调用到on方法呢?这就源于ObserverType协议的扩展。

    /// Convenience API extensions to provide alternate next, error, completed events
    extension ObserverType {
        
        /// Convenience method equivalent to `on(.next(element: E))`
        ///
        /// - parameter element: Next element to send to observer(s)
        public func onNext(_ element: E) {
            self.on(.next(element))
        }
    ...
    }
    
    let ob = Observable<Any>.create { (obserber) -> Disposable in
        //然后回到这个方法,再进行onCompleted()方法,重复上面的步骤不过在ObserverType的扩展协议中调用的是下面这个方法:
        obserber.onNext("111")
        obserber.onCompleted()
    //            obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil))
        return Disposables.create()
    }
    
    extension ObserverType {
        public func onCompleted() {
            self.on(.completed)
        }
    }
    
    class Producer<Element> : Observable<Element> {
    ...
        override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // The returned disposable needs to release all references once it was disposed.
                ...
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
    ...
    }
    
    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
    ...
        let observer = AnonymousObserver<E> { event in    
            #if DEBUG
                synchronizationTracker.register(synchronizationErrorMessage: .default)
                defer { synchronizationTracker.unregister() }
            #endif
            
            switch event {
            case .next(let value):
                onNext?(value)
            case .error(let error):
                if let onError = onError {
                    onError(error)
                }
                else {
                    Hooks.defaultErrorHandler(callStack, error)
                }
                disposable.dispose()
            case .completed:
                onCompleted?()
                disposable.dispose()
            }
        }
        return Disposables.create(
        //然后走这一步序列去订阅观察者
            self.asObservable().subscribe(observer),
            disposable
        )
    }
    }
    

    总结:
    此分类是从外部使用为基点分段。

    1.创建到初始化观察者的on方法。
    Observable<Any>.create -> ob.subscribe() ->return Disposables.create(self.asObservable().subscribe(observer),disposable) -> AnyObserver中初始化self.observer = observer.on ->
    
    2.观察者的onNext通过sink到内部订阅具体方法中去区分。
    obserber.onNext("111") -> AnyObserver中的on方法return self.observer(event) -> Sink中的forwardOn方法self._observer.on(event) -> subscribe 中的 case .next -> 
    
    3.外部的订阅,通过onNext进行获取值,如何获取的步骤如下,和2的方向可以理解成相反的但是都是汇合到sink中来操作。sink是中间者。
    ob.subscribe(onNext: { (text) in print("订阅到:\(text)")} -> onNext?(value) -> AnonymousObserver 中的onCore方法return self._eventHandler(event) -> ObserverBase中on方法中的 case .next:self.onCore(event) ->Sink中的forwardOn方法self._observer.on(event)
    
    4.最后这个就是通过sink找到真正的观察者创建时的obserber.onNext("111")方法的过程了。
    ObserverType中的onNext{ self.on(.next(element)) } -> let ob = Observable<Any>.create { (obserber) -> Disposable in
                obserber.onNext("111")
            }
            
     5.如果观察者还有onNext方法,那么就同2。
     obserber.onNext("222") -> AnyObserver中的on方法return self.observer(event) -> Sink中的forwardOn方法self._observer.on(event)。。。。类比2步骤及之后的步骤。
    

    通过框架结构的分析总结图:

    参考文献:

    RxSwift核心逻辑-源码分析

    RxSwift之管道——AnonymousObservableSink

    相关文章

      网友评论

          本文标题:Rxswift核心逻辑之sink

          本文链接:https://www.haomeiwen.com/subject/iaqijctx.html