美文网首页RxSwift源码解析
Rx 构造操作符分类

Rx 构造操作符分类

作者: 狼性刀锋 | 来源:发表于2018-09-28 15:48 被阅读13次

    前言

    这里的分类指的是按照实现原理分类,而不是按照功能进行分类,针对每一个分类选择一个具体类型,进行分析

    Override subscribe operator

    • 操作符:never,empty,justerror
    • 特点:
      1. 通过简单重载subscribe方法达到目的,
      2. 没有Sink这个概念,因为太简单了,都不需要Sink
    • 示例:
    // class Just
        override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        // 无需经过Sink,直接发出信号
            observer.on(.next(_element))
            observer.on(.completed)
            return Disposables.create()
        }
    
    
    
    

    Recursive scheduling operator

    • 操作符: of,from,range,repeatElement, generate
    • 特点:
      1. 输入源都是一个Sequence, 所以其核心是一个迭代器
      2. 通过递归调度输出元素
    • 示例:
    // ObservableSequenceSink Class
          return _parent._scheduler.scheduleRecursive((_parent._elements.makeIterator(), _parent._elements)) { (iterator, recurse) in
    
                var mutableIterator = iterator
    
                if let next = mutableIterator.0.next() {
                    print("scheduleRecursive: \(next)")
                    self.forwardOn(.next(next))
                    recurse(mutableIterator)
                }
                else {
                    self.forwardOn(.completed)
                    self.dispose()
                }
            }
    
    

    scheduleRecursive: 线程递归调度方法
    let next = mutableIterator.0.next(): 迭代器迭代元素

    create

    这个之前详细讲过,创建一个AnonymousObservable 作为承载闭包的实体

        public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
            return AnonymousObservable(subscribe)
        }
    

    do

    这个很重要,重点讲一下

    // simple Example
     Observable.of("🍎", "🍐", "🍊", "🍋")
            .do(onNext: { print("Intercepted:", $0) }, onError: { print("Intercepted error:", $0) }, onCompleted: { print("Completed")  })
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    
    

    do这个操作符是专门用来处理副作用的,什么是副作用呢,打个简单的比方实现: 1 + 2 + 3 + 4 + 5, 我要在+3的时候,做一个额外的操作,我要改变一下ui的背景色,但是这一步对最终的结果没有任何影响,就可以使用do操作符,这么做的好处能够提高代码可读性同时也良好体现了函数的单一性职责。好了现在看看实现原理。

    // Do Class
        override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            _onSubscribe?()
            let sink = DoSink(eventHandler: _eventHandler, observer: observer, cancel: cancel)
            let subscription = _source.subscribe(sink)
            _onSubscribed?()
            let onDispose = _onDispose
            let allSubscriptions = Disposables.create {
                subscription.dispose()
                onDispose?()
            }
            return (sink: sink, subscription: allSubscriptions)
        }
    
    

    这里的_source即 original Observable,而这里的sink 指的就是DoSink

    // DoSink Class
        func on(_ event: Event<Element>) {
            do {
                try _eventHandler(event)
                forwardOn(event)
                if event.isStopEvent {
                    dispose()
                }
            }
            catch let error {
                forwardOn(.error(error))
                dispose()
            }
        }
    
    

    DoSink 会在执行on事件的时候执行_eventHandler,也就是最开始用户传进来的那个闭包。

    Rx最核心的是什么就是响应式的编程, A -> B -> C -> D -> E,A事件的发生最终导致E事件的发生。那么本来相互孤立的事件如何建立联系呢? 答案就在 let subscription = _source.subscribe(sink) 这一句, 通过subscribe将本来孤立的事情紧密的联系在一起,并且Rx隐藏了所以的细节,用户无需为此做大量的额外操作就能获得该功能。 每一个Observer 都不需要了解具体有多少个Observable,它只需要上个Observable是谁就可以了。整个事件看起来是这样:E subscribe D subscribe C subscribe B subscribe A,但是细分一点其实是这样:

    E subscribe D 
                  D subscribe C
                                 C subscribe B 
                                                B subscribe A
    

    不管整个事件流到底有多长,其核心构建就是 B和A,用算法归纳就是如下:

    var e
    while(e.hasObservable) {
      e.observer(e.Observable)
      e = e.Observable
    }
    

    这个很像单车的链条,不管链条多长,其最小的组成单元都是一个小链,有头尾两端,可以与其他的链在一起。

    Deferred

    推迟执行,它只在被订阅的时候才去创建Observable,而create是在一开始就创建ObservableDeferred 每次被订阅都会创建一个新的Observable,而create被多次订阅都是同一个Observable

      let disposeBag = DisposeBag()
        var count = 1
        
        let deferredSequence = Observable<String>.deferred {
            print("Creating \(count)")
            count += 1
            
            return Observable.create { observer in
                print("Emitting...")
                observer.onNext("🐶")
                observer.onNext("🐱")
                observer.onNext("🐵")
                return Disposables.create()
            }
        }
        
        deferredSequence
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        deferredSequence
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    
    
    // output log 
    Creating 1
    Emitting...
    🐶
    🐱
    🐵
    Creating 2
    Emitting...
    🐶
    🐱
    🐵
    
    
    

    由于每次创建都是通过闭包创建新的Observable,而闭包捕获的值count在更新,所以两次运行结果不一样

    实现原理

    // DeferredSink Class
        func run() -> Disposable {
            do {
                let result = try _observableFactory()
                return result.subscribe(self)
            }
            catch let e {
                forwardOn(.error(e))
                dispose()
                return Disposables.create()
            }
        }
    
    

    这里可以看到每次订阅的时候都会通过_observableFactory产生新的Observable

    相关文章

      网友评论

        本文标题:Rx 构造操作符分类

        本文链接:https://www.haomeiwen.com/subject/kwspoftx.html