美文网首页
RxSwift CombineLatest 解析

RxSwift CombineLatest 解析

作者: zhongxiaoyue | 来源:发表于2019-08-02 10:53 被阅读0次

    CombineLatest

    CombineLatest 方法可以将多个Observables的信号合并成一个信号然后发送给订阅者.因此CombineLatest 序列发送信号的前提条件是每个Observables都发出过信号.

    当多个 Observables 中任何一个发出一个信号,就发出一个信号。这个信号是由这些Observables 中最新的信号,通过一个函数组合起来的

    combineLatest.png

    combineLatest 操作符将多个 Observables 中最新的信号通过一个函数组合起来,然后将这个组合的结果发出来。这些源 Observables 中任何一个发出一个信号,他都会发出一个信号(前提是,这些 Observables 曾经发出过信号)。

    Demo

    • 代码
    let disposeBag = DisposeBag()
    
    let first = PublishSubject<String>()
    let second = PublishSubject<String>()
    
    Observable.combineLatest(first, second) { $0 + $1 }
              .subscribe(onNext: { print($0) })
              .disposed(by: disposeBag)
    
    first.onNext("1")
    second.onNext("A")
    first.onNext("2")
    second.onNext("B")
    second.onNext("C")
    second.onNext("D")
    first.onNext("3")
    first.onNext("4")
    
    • 输出结果:

    1A
    2A
    2B
    2C
    2D
    3D
    4D

    核心逻辑

    注: 因为逻辑相似,这里只以合并2个序列信号,序列A和序列B为例.

      1. 创建CombineLatest 序列,将序列A序列B以及信号合并Block作为init参数初始化,并保持.
        public static func combineLatest<O1: ObservableType, O2: ObservableType>
            (_ source1: O1, _ source2: O2)
                -> Observable<(O1.E, O2.E)> {
            return CombineLatest2(
                source1: source1.asObservable(), source2: source2.asObservable(),
                resultSelector: { ($0, $1) }
            )
        }
    
      1. 用户订阅CombineLatest 序列,触发CombineLatest 序列的run方法.然后创建CombineLatestSink2_对象,并调用CombineLatestSink2_.run执行具体事务.
        override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == R {
            let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
            let subscription = sink.run()
            return (sink: sink, subscription: subscription)
        }
    
      1. 在CombineLatestSink2_.run中.会创建类型为CombineLatestObserver的两个订阅者observer1observer2并保存CombineLatestSink2_作为属性self._parent = CombineLatestSink2_,然后将observer1observer2作为序列A序列B的订阅者.
        func run() -> Disposable {
            let subscription1 = SingleAssignmentDisposable()
            let subscription2 = SingleAssignmentDisposable()
    
            let observer1 = CombineLatestObserver(lock: self._lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
            let observer2 = CombineLatestObserver(lock: self._lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
    
             subscription1.setDisposable(self._parent._source1.subscribe(observer1))
             subscription2.setDisposable(self._parent._source2.subscribe(observer2))
    
            return Disposables.create([
                    subscription1,
                    subscription2
            ])
        }
    
    • 4.当序列A或序列B发出信号时,触发CombineLatestObserver对象的on方法,在on方法中会调用self._parent.next(self._index),告诉sink哪些序列已经发送过信号了.在调用self._setLatestValue(value)将信号值保存到sink对象中.
        func on(_ event: Event<Element>) {
            self.synchronizedOn(event)
        }
    
        func _synchronized_on(_ event: Event<Element>) {
            switch event {
            case .next(let value):
                self._setLatestValue(value)
                self._parent.next(self._index)
            case .error(let error):
                self._this.dispose()
                self._parent.fail(error)
            case .completed:
                self._this.dispose()
                self._parent.done(self._index)
            }
        }
    
      1. sinknext方法判断所有序列(A和B)都曾发送过信号时,会调用信号合并Block将所有序列的最后一个信号合并成一个新信号,然后将这个信号作为CombineLatest 序列的信号发送出来.由于用户在第2步订阅了CombineLatest序列,所以会收到合并后的新信号.
        func next(_ index: Int) {
            if !self._hasValue[index] {
                self._hasValue[index] = true
                self._numberOfValues += 1
            }
    
            if self._numberOfValues == self._arity {
                do {
                    let result = try self.getResult()
                    self.forwardOn(.next(result))
                }
                catch let e {
                    self.forwardOn(.error(e))
                    self.dispose()
                }
            }
            else {
                var allOthersDone = true
    
                for i in 0 ..< self._arity {
                    if i != index && !self._isDone[i] {
                        allOthersDone = false
                        break
                    }
                }
                
                if allOthersDone {
                    self.forwardOn(.completed)
                    self.dispose()
                }
            }
        }
    

    思维导图

    相关文章

      网友评论

          本文标题:RxSwift CombineLatest 解析

          本文链接:https://www.haomeiwen.com/subject/cwikdctx.html