美文网首页
RxSwift-combineLatest源码解析

RxSwift-combineLatest源码解析

作者: king_jensen | 来源:发表于2019-08-13 17:06 被阅读0次

combineLatest的使用

 let stringSub = PublishSubject<String>()
        let intSub = PublishSubject<Int>()
        Observable.combineLatest(stringSub, intSub) { strElement, intElement in
                "\(strElement) \(intElement)"
            }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
        
        stringSub.onNext("J")
        intSub.onNext(1)
        intSub.onNext(2)
        stringSub.onNext("E")
        stringSub.onNext("JENSEN") 

combineLatest序列创建

 public static func combineLatest<O1: ObservableType, O2: ObservableType>
        (_ source1: O1, _ source2: O2, resultSelector: @escaping (O1.Element, O2.Element) throws -> Element)
            -> Observable<Element> {
        return CombineLatest2(
            source1: source1.asObservable(), source2: source2.asObservable(),
            resultSelector: resultSelector
        )
    }

combineLatest实际返回的是CombineLatest2序列
进入CombineLatest2序列

init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: @escaping ResultSelector) {
        self._source1 = source1
        self._source2 = source2

        self._resultSelector = resultSelector
    }

保存_source1_source2,已经外界的闭包_resultSelector

combineLatest序列订阅

CombineLatest2被订阅时,将会来到CombineLatest2run

 override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Result {
        let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }

1.CombineLatestSink2_保存_parent(CombineLatest2),观察者observer

 init(parent: Parent, observer: Observer, cancel: Cancelable) {
        self._parent = parent
        super.init(arity: 2, observer: observer, cancel: cancel)
    }

2.调用sink.run()

func run() -> Disposable {
        let subscription1 = SingleAssignmentDisposable()
        let subscription2 = SingleAssignmentDisposable()

        let observer1 = CombineLatestObserver(lock: self._lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
        let observer2 = CombineLatestObserver(lock: self._lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)

         subscription1.setDisposable(self._parent._source1.subscribe(observer1))
         subscription2.setDisposable(self._parent._source2.subscribe(observer2))

        return Disposables.create([
                subscription1,
                subscription2
        ])
    }

(1).创建类型CombineLatestObserver观察者observer1,observer2.保存参数_lock,_parent,_index,_this,_setLatestValue
_parentCombineLatestSink2_,_index是观察者观察的序列的索引值,_setLatestValue是闭包{ (e: E2) -> Void in self._latestElement1 = e },存储该序列最后一次发送响应的值到对应的属性。

(2).self._parent._source1.subscribe(observer1),self._parent._source2.subscribe(observer2),分别对_source1_source2进行订阅。

对源序列_source1,_source2发送响应

当对_source1发送响应,调用到CombineLatestObserver.on

func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }

调用SynchronizedOnType.synchronizedOn

extension SynchronizedOnType {
    func synchronizedOn(_ event: Event<Element>) {
        self.lock(); defer { self.unlock() }
        self._synchronized_on(event)
    }
}

调用CombineLatestObserver._synchronized_on

 func _synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next(let value):
            self._setLatestValue(value)
            self._parent.next(self._index)
        case .error(let error):
            self._this.dispose()
            self._parent.fail(error)
        case .completed:
            self._this.dispose()
            self._parent.done(self._index)
        }
    }

_synchronized_on首先调用_setLatestValue保存序列最后一次响应的值
调用CombineLatestSink.next(self_index)

func next(_ index: Int) {
        if !self._hasValue[index] {
            self._hasValue[index] = true
            self._numberOfValues += 1
        }

        if self._numberOfValues == self._arity {
            do {
                let result = try self.getResult()
                self.forwardOn(.next(result))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        }
        else {
            var allOthersDone = true

            for i in 0 ..< self._arity {
                if i != index && !self._isDone[i] {
                    allOthersDone = false
                    break
                }
            }
            
            if allOthersDone {
                self.forwardOn(.completed)
                self.dispose()
            }
        }

1.首先判断self._hasValue[index]判断改序列之前是否响应过,未响应过将标志设置为已响应self._hasValue[index] = true,并且将已响应序列的数量+1self._numberOfValues += 1

2.self._numberOfValues == self._arity判断是否已全部响应
如果已经全部响应:

  override func getResult() throws-> Result {
        return try self._parent._resultSelector(self._latestElement1, self._latestElement2)
    }

(1)获取调用外界combineLatest时,保存的_resultSelector闭包,并调用,将结果保存至result
(2)Sink.forwardOn(.next(result))-->AnonymousObservableSink.on(.next(result)),最后调用到外界的闭包中。
这里用到了一个算法:就是判断所有序列是不是已经全部响应过,这个算法可以学习一下。

总结

combineLatest.png

相关文章

网友评论

      本文标题:RxSwift-combineLatest源码解析

      本文链接:https://www.haomeiwen.com/subject/osgvjctx.html