美文网首页selector
RxSwift FlatMap 解析

RxSwift FlatMap 解析

作者: zhongxiaoyue | 来源:发表于2019-08-02 10:11 被阅读0次

    flatMap

    Observable 的信号转换成其他的 Observable,然后将这些 Observables 合并

    flatMap 操作符将源 Observable 的每一个信号应用一个转换方法,将他们转换成 Observables。 然后将这些 Observables 的信号合并之后再发送出来。

    这个操作符是非常有用的,例如,当 Observable 的信号本身拥有其他的 Observable 时,你可以将所有 Observables 的信号发送出来。

    Demo

    • 代码
    let disposeBag = DisposeBag()
    let first = BehaviorSubject(value: "👦🏻")
    let second = BehaviorSubject(value: "🅰️")
    let variable = Variable(first)
    
    variable.asObservable()
            .flatMap { $0 }
            .subscribe(onNext: { print($0) })
            .disposed(by: disposeBag)
    
    first.onNext("🐱")
    variable.value = second
    second.onNext("🅱️")
    first.onNext("🐶")
    
    • 输出结果:

    👦🏻
    🐱
    🅰️
    🅱️
    🐶

    核心逻辑

      1. 通过flatMap方法,创建一个flatMap序列,将源序列和信号转序列Block传入flatMap序列中.
        public func flatMap<Source: ObservableConvertibleType>(_ selector: @escaping (Element) throws -> Source)
            -> Observable<Source.Element> {
                return FlatMap(source: self.asObservable(), selector: selector)
        }
    
      1. 用户直接订阅flatMap序列, 当用户订阅时,会触发flatMaprun方法.创建FlatMapSink对象.并将FlatMapSink对象作为源序列的订阅者.
        override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == SourceSequence.Element {
            let sink = FlatMapSink(selector: self._selector, observer: observer, cancel: cancel)
            let subscription = sink.run(self._source)
            return (sink: sink, subscription: subscription)
        }
    
      1. 当源序列发出信号时,FlatMapSink会响应(触发on方法),然后调用信号转序列Block将源序列的信号转为新序列,接着创建一个MergeSinkIter对象,作为新序列的订阅者.
        func subscribeInner(_ source: Observable<Observer.Element>) {
            let iterDisposable = SingleAssignmentDisposable()
            if let disposeKey = self._group.insert(iterDisposable) {
                let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
                let subscription = source.subscribe(iter)
                iterDisposable.setDisposable(subscription)
            }
        }
    
      1. 当新序列发出信号时,MergeSinkIter会响应(触发on方法),然后将新序列的信号作为flatMap序列的信号发送出去self._parent.forwardOn(.next(value)).这时因为用户在第2步订阅了flatMap序列,所以会收到新序列的信号.
        func on(_ event: Event<Element>) {
            self._parent._lock.lock(); defer { self._parent._lock.unlock() } // lock {
                switch event {
                case .next(let value):
                    self._parent.forwardOn(.next(value))
                case .error(let error):
                    self._parent.forwardOn(.error(error))
                    self._parent.dispose()
                case .completed:
                    self._parent._group.remove(for: self._disposeKey)
                    self._parent._activeCount -= 1
                    self._parent.checkCompleted()
                }
            // }
        }
    

    思维导图

    相关文章

      网友评论

        本文标题:RxSwift FlatMap 解析

        本文链接:https://www.haomeiwen.com/subject/ugukdctx.html