美文网首页RxSwift源码解析
RxSwift Combination Operators of

RxSwift Combination Operators of

作者: 狼性刀锋 | 来源:发表于2018-10-13 11:43 被阅读33次

    使用示例

    example("zip") {
        let disposeBag = DisposeBag()
        
        let stringSubject = PublishSubject<String>()
        let intSubject = PublishSubject<Int>()
        
        Observable.zip(stringSubject, intSubject) { stringElement, intElement in
            "\(stringElement) \(intElement)"
            }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        stringSubject.onNext("🅰️")
        stringSubject.onNext("🅱️")
        
        intSubject.onNext(1)
        
        intSubject.onNext(2)
        
        stringSubject.onNext("🆎")
        intSubject.onNext(3)
    }
    
    
    // out put log 
    
    --- zip example ---
    🅰️ 1
    🅱️ 2
    🆎 3
    
    

    实现原理

    Zip 有着一系列类簇,从Zip2 - Zip8 ,实现原理都是一样的区别在于Observable 数量。所以这里只重点关注下Zip2的实现

    final class Zip2<E1, E2, R> : Producer<R> {
        typealias ResultSelector = (E1, E2) throws -> R
    
        let source1: Observable<E1>
        let source2: Observable<E2>
    
        let _resultSelector: ResultSelector
    
        init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: @escaping ResultSelector) {
            self.source1 = source1
            self.source2 = source2
    
            _resultSelector = resultSelector
        }
    
        override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == R {
            let sink = ZipSink2_(parent: self, observer: observer, cancel: cancel)
            let subscription = sink.run()
            return (sink: sink, subscription: subscription)
        }
    }
    
    

    和其他操作符一样,仍然通过Sink实现核心功能

    // ZipSink2_.run
    
        var _values1: Queue<E1> = Queue(capacity: 2)
        var _values2: Queue<E2> = Queue(capacity: 2)
        
        func run() -> Disposable {
            let subscription1 = SingleAssignmentDisposable()
            let subscription2 = SingleAssignmentDisposable()
    
            let observer1 = ZipObserver(lock: _lock, parent: self, index: 0, setNextValue: { self._values1.enqueue($0) }, this: subscription1)
            let observer2 = ZipObserver(lock: _lock, parent: self, index: 1, setNextValue: { self._values2.enqueue($0) }, this: subscription2)
    
            subscription1.setDisposable(_parent.source1.subscribe(observer1))
            subscription2.setDisposable(_parent.source2.subscribe(observer2))
    
            return Disposables.create([
               subscription1,
               subscription2
            ])
        }
    

    ZipSink2_ 创建两个ZipObserver, 分别订阅observerable1, observerable2

    // ZipObserver._synchronized_on
        func _synchronized_on(_ event: Event<E>) {
            if let _ = _parent {
                switch event {
                case .next(_):
                    break
                case .error(_):
                    _this.dispose()
                case .completed:
                    _this.dispose()
                }
            }
            
            if let parent = _parent {
                switch event {
                case .next(let value):
                    _setNextValue(value)
                    parent.next(_index)
                case .error(let error):
                    parent.fail(error)
                case .completed:
                    parent.done(_index)
                }
            }
        }
    
    

    Merge操作符原理一样, ZipObserver在接收事件后,会将事件传递给ZipSink2_。在next事件的时候会调用_setNextValue, 这会触发self._values1.enqueue($0)操作,即数据入列操作

    // ZipSink.next
     func next(_ index: Int) {
            var hasValueAll = true
            
            for i in 0 ..< _arity {
                if !hasElements(i) {
                    hasValueAll = false
                    break
                }
            }
            
            if hasValueAll {
                do {
                    let result = try getResult()
                    self.forwardOn(.next(result))
                }
                catch let e {
                    self.forwardOn(.error(e))
                    dispose()
                }
            }
            else {
                var allOthersDone = true
                
                let arity = _isDone.count
                for i in 0 ..< arity {
                    if i != index && !_isDone[i] {
                        allOthersDone = false
                        break
                    }
                }
                
                if allOthersDone {
                    forwardOn(.completed)
                    self.dispose()
                }
            }
        }
    
    

    函数分为3个逻辑块:

    1. 检测是否都有值,_arity 是被ZipObservable数量,在本例子中等于2
    // ZipSink2_. hasElements
        override func hasElements(_ index: Int) -> Bool {
            switch (index) {
            case 0: return _values1.count > 0
            case 1: return _values2.count > 0
    
            default:
                rxFatalError("Unhandled case (Function)")
            }
    
            return false
        }
    
    
    1. 如果都有值便发送forwardOn next
    // ZipSink2_.getResult
        override func getResult() throws -> R {
            return try _parent._resultSelector(_values1.dequeue()!, _values2.dequeue()!)
        }
    
    

    这里要注意的是getResult, 会触发数据的出列操作,也就是说getResult
    _values1.count = 1, _values2.count = 1

    之后
    _values1.count = 0, _values2.count = 0

    1. 检测是否有Observable 已经completed了,如果是的话,发送 forwardOn completed
      简单验证一下
    example("zip") {
        let disposeBag = DisposeBag()
        
        let stringSubject = PublishSubject<String>()
        let intSubject = PublishSubject<Int>()
        
        Observable.zip(stringSubject, intSubject) { stringElement, intElement in
            "\(stringElement) \(intElement)"
            }
            .debug()
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        stringSubject.onNext("🅰️")
        
        
        stringSubject.onNext("🅱️")
        
      //  stringSubject.onNext("🆎")
        
        stringSubject.onCompleted()
        
        intSubject.onNext(1)
        
        intSubject.onNext(2)
        
        intSubject.onNext(3)
        
        
        
    }
    
    
    // out put log 
    --- zip example ---
    2018-10-13 11:27:37.532: Rx.playground:73 (__lldb_expr_7) -> subscribed
    2018-10-13 11:27:37.536: Rx.playground:73 (__lldb_expr_7) -> Event next(🅰️ 1)
    🅰️ 1
    2018-10-13 11:27:37.537: Rx.playground:73 (__lldb_expr_7) -> Event next(🅱️ 2)
    🅱️ 2
    2018-10-13 11:27:37.539: Rx.playground:73 (__lldb_expr_7) -> Event completed
    2018-10-13 11:27:37.539: Rx.playground:73 (__lldb_expr_7) -> isDisposed
    
    
    

    这里 如果去掉 stringSubject.onCompleted() 或者 intSubject.onNext(3) 的话,都不会收到completed 事件

    ZipSinkZipSink2_ 的父类

    相关文章

      网友评论

        本文标题:RxSwift Combination Operators of

        本文链接:https://www.haomeiwen.com/subject/tikoaftx.html