flatMap
将 Observable
的信号转换成其他的 Observable
,然后将这些 Observables
合并
flatMap 操作符将源 Observable
的每一个信号应用一个转换方法,将他们转换成 Observables
。 然后将这些 Observables
的信号合并之后再发送出来。
这个操作符是非常有用的,例如,当 Observable
的信号本身拥有其他的 Observable
时,你可以将所有子 Observables
的信号发送出来。
Demo
- 代码
let disposeBag = DisposeBag()
let first = BehaviorSubject(value: "👦🏻")
let second = BehaviorSubject(value: "🅰️")
let variable = Variable(first)
variable.asObservable()
.flatMap { $0 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
first.onNext("🐱")
variable.value = second
second.onNext("🅱️")
first.onNext("🐶")
- 输出结果:
👦🏻
🐱
🅰️
🅱️
🐶
核心逻辑
- 通过
flatMap方法
,创建一个flatMap序列
,将源序列和信号转序列Block
传入flatMap序列
中.
- 通过
public func flatMap<Source: ObservableConvertibleType>(_ selector: @escaping (Element) throws -> Source)
-> Observable<Source.Element> {
return FlatMap(source: self.asObservable(), selector: selector)
}
- 用户直接订阅flatMap序列, 当用户订阅时,会触发
flatMap
的run
方法.创建FlatMapSink
对象.并将FlatMapSink
对象作为源序列的订阅者.
- 用户直接订阅flatMap序列, 当用户订阅时,会触发
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == SourceSequence.Element {
let sink = FlatMapSink(selector: self._selector, observer: observer, cancel: cancel)
let subscription = sink.run(self._source)
return (sink: sink, subscription: subscription)
}
- 当源序列发出信号时,
FlatMapSink
会响应(触发on方法),然后调用信号转序列Block
将源序列的信号转为新序列,接着创建一个MergeSinkIter
对象,作为新序列的订阅者.
- 当源序列发出信号时,
func subscribeInner(_ source: Observable<Observer.Element>) {
let iterDisposable = SingleAssignmentDisposable()
if let disposeKey = self._group.insert(iterDisposable) {
let iter = MergeSinkIter(parent: self, disposeKey: disposeKey)
let subscription = source.subscribe(iter)
iterDisposable.setDisposable(subscription)
}
}
- 当新序列发出信号时,
MergeSinkIter
会响应(触发on方法),然后将新序列的信号作为flatMap序列
的信号发送出去self._parent.forwardOn(.next(value))
.这时因为用户在第2步订阅了flatMap序列,所以会收到新序列的信号.
- 当新序列发出信号时,
func on(_ event: Event<Element>) {
self._parent._lock.lock(); defer { self._parent._lock.unlock() } // lock {
switch event {
case .next(let value):
self._parent.forwardOn(.next(value))
case .error(let error):
self._parent.forwardOn(.error(error))
self._parent.dispose()
case .completed:
self._parent._group.remove(for: self._disposeKey)
self._parent._activeCount -= 1
self._parent.checkCompleted()
}
// }
}
网友评论