美文网首页Rx
RxSwift源码分析(9)——combineLatest

RxSwift源码分析(9)——combineLatest

作者: 无悔zero | 来源:发表于2020-10-12 11:02 被阅读0次

    今天开始一起分析一些常用的高阶函数,今天先分析combineLatest。有时候我们需要同时监听多个响应,或者合并多个响应时,就可以用combineLatest
    我们先看下面的例子,在登录界面,只有同时输入了账号和密码才能点击登录:

    let userNameVaild = userNameTf.rx.text.orEmpty
    .map { (text) -> Bool in
         return text.count > 0
    }
    let passwordVaild = passwordTf.rx.text.orEmpty
    .map { (text) -> Bool in
         return text.count > 0
    }
    Observable.combineLatest(userNameVaild,passwordVaild) { $0 && $1 }
    .bind(to: loginBtn.rx.isEnabled)
    .disposed(by: disposeBag)
    
    没有密码不能点击 没有账号不能点击 可以点击
    1. 前面只是创建序列,我们直接来看combineLatest,返回CombineLatest2序列(CombineLatest2继承了Producer):
    extension ObservableType {
        public static func combineLatest<O1: ObservableType, O2: ObservableType>
            (_ source1: O1, _ source2: O2, resultSelector: @escaping (O1.E, O2.E) throws -> E)
                -> Observable<E> {
            return CombineLatest2(
                source1: source1.asObservable(), source2: source2.asObservable(),
                resultSelector: resultSelector
            )
        }
    }
    
    final class CombineLatest2<E1, E2, R> : Producer<R> {
        ...
        init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: @escaping ResultSelector) {
            self._source1 = source1  //保存两个源序列
            self._source2 = source2
    
            self._resultSelector = resultSelector
        }
        
        override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == R {
            let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
            let subscription = sink.run()
            return (sink: sink, subscription: subscription)
        }
    }
    
    1. CombineLatest2run函数里创建CombineLatestSink2_(业务下沉,CombineLatestSink2_继承了CombineLatestSink),调用了非常熟悉的sink.run(),我们马上进入CombineLatestSink2_里看看:
    final class CombineLatestSink2_<E1, E2, O: ObserverType> : CombineLatestSink<O> {
        ...
        func run() -> Disposable {
            let subscription1 = SingleAssignmentDisposable()
            let subscription2 = SingleAssignmentDisposable()
            //分别创建观察者
            let observer1 = CombineLatestObserver(lock: self._lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
            let observer2 = CombineLatestObserver(lock: self._lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
            //先订阅再setDisposable
            subscription1.setDisposable(self._parent._source1.subscribe(observer1))
            subscription2.setDisposable(self._parent._source2.subscribe(observer2))
    
            return Disposables.create([
                    subscription1,
                    subscription2
            ])
        }
    }
    
    1. CombineLatestSink2_创建了两个CombineLatestObserver观察者分别用源序列进行订阅,根据RxSwift核心逻辑,最终来到CombineLatestObserveron函数里:
    final class CombineLatestObserver<ElementType>
        : ObserverType
        , LockOwnerType
        , SynchronizedOnType {
        ...
        init(lock: RecursiveLock, parent: CombineLatestProtocol, index: Int, setLatestValue: @escaping ValueSetter, this: Disposable) {
            self._lock = lock
            self._parent = parent //保存CombineLatestSink2_
            self._index = index
            self._this = this
            self._setLatestValue = setLatestValue
        }
        
        func on(_ event: Event<Element>) {
            self.synchronizedOn(event)
        }
    
        func _synchronized_on(_ event: Event<Element>) {
            switch event {
            case .next(let value):
                self._setLatestValue(value)
                self._parent.next(self._index)
            case .error(let error):
                self._this.dispose()
                self._parent.fail(error)
            case .completed:
                self._this.dispose()
                self._parent.done(self._index)
            }
        }
    }
    
    extension SynchronizedOnType {
        func synchronizedOn(_ event: Event<E>) {
            self.lock(); defer { self.unlock() }
            self._synchronized_on(event)
        }
    }
    

    CombineLatestObserver.on调用后,最终调用了self._parent.next(self._index)self._parent就是初始化保存下来的CombineLatestSink2_

    1. CombineLatestSink2_没有实现具体的方法,我们看父类CombineLatestSink
    class CombineLatestSink<O: ObserverType>
        : Sink<O>
        , CombineLatestProtocol {
        ...
        func next(_ index: Int) {
            if !self._hasValue[index] {
                self._hasValue[index] = true
                self._numberOfValues += 1  //标记
            }
            //判断是否能够响应
            if self._numberOfValues == self._arity {
                do {
                    let result = try self.getResult()
                    self.forwardOn(.next(result))
                }
                catch let e {
                    self.forwardOn(.error(e))
                    self.dispose()
                }
            }
            else {
                var allOthersDone = true
    
                for i in 0 ..< self._arity {
                    if i != index && !self._isDone[i] {
                        allOthersDone = false
                        break
                    }
                }
                
                if allOthersDone {
                    self.forwardOn(.completed)
                    self.dispose()
                }
            }
        }
    
    • 我们看到了self.forwardOn(.next())发送响应,非常熟悉,但是在这之前还有一些步骤,就是标记和判断,还有获取结果:
    if !self._hasValue[index] {
        self._hasValue[index] = true
        self._numberOfValues += 1  //标记
    }
    //判断是否能够响应
    if self._numberOfValues == self._arity {
        ...
    }
    else {
        ...
    }
    

    首先将序列进行标记self._numberOfValues += 1,当两个序列或n个序列都进行了订阅时,self._numberOfValues等于self._arity,便可执行下一步self.getResult()获取结果。

    1. self.getResult()里面调用了self._parent._resultSelector
    final class CombineLatestSink2_<E1, E2, O: ObserverType> : CombineLatestSink<O> {
        ...
        override func getResult() throws -> R {
            return try self._parent._resultSelector(self._latestElement1, self._latestElement2)
        }
    }
    

    self._parent._resultSelector是一开始就保存的外面的闭包,然后返回一个值:

    Observable.combineLatest(userNameVaild,passwordVaild) { $0 && $1 }
    
    1. 最后才调用了self.forwardOn(.next(result))发送响应,实现通过两个序列来控制loginBtn是否能够点击。

    相关文章

      网友评论

        本文标题:RxSwift源码分析(9)——combineLatest

        本文链接:https://www.haomeiwen.com/subject/iasepktx.html