美文网首页
RxSwift(2)-核心逻辑

RxSwift(2)-核心逻辑

作者: BoxJing | 来源:发表于2019-07-28 18:49 被阅读0次

    今天详细分解一下RxSwift的核心逻辑

    序列的创建
            let ob = Observable<Any>.create { (obserber) -> Disposable in
                obserber.onNext(“Hello RxSwift")
                obserber.onCompleted()
                obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil))
                return Disposables.create()
            }
    

    从Create.swift类文件进入。在Create.swift类中,其实就是返回了一个匿名序列AnonymousObservable对象。
    继续进入

    final private class AnonymousObservable<Element>: Producer<Element> {
        typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    
        let _subscribeHandler: SubscribeHandler
    
        init(_ subscribeHandler: @escaping SubscribeHandler) {
            self._subscribeHandler = subscribeHandler
        }
    
        override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)
            return (sink: sink, subscription: subscription)
        }
    }
    

    上面就是AnonymousObservable类的定义,继承了Producer,保存了闭包的代码块。

    订阅序列

    订阅者AnonymousObserver的创建。后边对event作为一个闭包接收并转发相应的事件:onNext、onError、onCompleted、onDisposed,后续发送响应时会直接传入该event事件。

    创建AnonymousObserver对象,在其初始化AnonymousObserver时创建并保存了一个_eventHandler:self._eventHandler = eventHandler,作为所有onNextonErroronCompleted的回调,用于在第3部分发送响应时将onNextonErroronComplete回调至ViewController.swift中的sucribe块。
    只要有event出现便进入switch判断,判断是.next.error.completed等,并执行对应的闭包方法onNext、onError、onCompleted。但具体是如何传递数据到ViewController.swift的.subscribe闭包中的呢?我们继续分析。
    在最后的return中调用create,执行self.asObservable().subscribe(observer) 。将创建的observer传递下去。
      subscribe( ):父类Producer.swift中定义了subscribe 函数

    override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // The returned disposable needs to release all references once it was disposed.
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
    
    发送响应

    onNext命令会调用起Create.swift中AnonymousObservableSinkon函数:

    func on(_ event: Event<E>) {
            #if DEBUG
                self._synchronizationTracker.register(synchronizationErrorMessage: .default)
                defer { self._synchronizationTracker.unregister() }
            #endif
            switch event {
            case .next:
                if load(self._isStopped) == 1 {
                    return
                }
                self.forwardOn(event)
            case .error, .completed:
                if fetchOr(self._isStopped, 1) == 0 {
                    self.forwardOn(event)
                    self.dispose()
                }
            }
        }
    

    执行self.fowardOn(event)。forwardOn函数定义于AnonymousObservableSink的父类Sink中。

    执行Sink中on函数:

    final func forwardOn(_ event: Event<O.E>) {
            #if DEBUG
                self._synchronizationTracker.register(synchronizationErrorMessage: .default)
                defer { self._synchronizationTracker.unregister() }
            #endif
            if isFlagSet(self._disposed, 1) {
                return
            }
            self._observer.on(event)
        }
    

    回调至subscribe的 AnonymousObserver中的event闭包块执行响应的条件.onNext

    let observer = AnonymousObserver<E> { event in
                    
                    #if DEBUG
                        synchronizationTracker.register(synchronizationErrorMessage: .default)
                        defer { synchronizationTracker.unregister() }
                    #endif
                    
                    switch event {
                    case .next(let value):
                        onNext?(value)
                    case .error(let error):
                        if let onError = onError {
                            onError(error)
                        }
                        else {
                            Hooks.defaultErrorHandler(callStack, error)
                        }
                        disposable.dispose()
                    case .completed:
                        onCompleted?()
                        disposable.dispose()
                    }
                }
    

    RxSwift实在太强大,下面上一张完整的思维导图帮助大家理解:


    RxSwift核心逻辑.png

    相关文章

      网友评论

          本文标题:RxSwift(2)-核心逻辑

          本文链接:https://www.haomeiwen.com/subject/twuprctx.html