flatMap
官方示例
example("flatMap and flatMapLatest") {
let disposeBag = DisposeBag()
struct Player {
var score: Variable<Int>
}
let 👦🏻 = Player(score: Variable(80))
let 👧🏼 = Player(score: Variable(90))
let player = Variable(👦🏻)
player.asObservable()
.flatMap { $0.score.asObservable() } // Change flatMap to flatMapLatest and observe change in printed output
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
👦🏻.score.value = 85
player.value = 👧🏼
👦🏻.score.value = 95 // Will be printed when using flatMap, but will not be printed when using flatMapLatest
👧🏼.score.value = 100
}
// out log
--- flatMap and flatMapLatest example ---
ℹ️ [DEPRECATED] `Variable` is planned for future deprecation. Please consider `BehaviorRelay` as a replacement. Read more at: https://git.io/vNqvx
80
85
90
95
100
函数原型
extension ObservableType {
/**
Projects each element of an observable sequence to an observable sequence and merges the resulting observable sequences into one observable sequence.
- seealso: [flatMap operator on reactivex.io](http://reactivex.io/documentation/operators/flatmap.html)
- parameter selector: A transform function to apply to each element.
- returns: An observable sequence whose elements are the result of invoking the one-to-many transform function on each element of the input sequence.
*/
public func flatMap<O: ObservableConvertibleType>(_ selector: @escaping (E) throws -> O)
-> Observable<O.E> {
return FlatMap(source: asObservable(), selector: selector)
}
}
FlatMap 实现
final fileprivate class FlatMap<SourceElement, SourceSequence: ObservableConvertibleType>: Producer<SourceSequence.E> {
typealias Selector = (SourceElement) throws -> SourceSequence
private let _source: Observable<SourceElement>
private let _selector: Selector
init(source: Observable<SourceElement>, selector: @escaping Selector) {
_source = source
_selector = selector
}
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == SourceSequence.E {
let sink = FlatMapSink(selector: _selector, observer: observer, cancel: cancel)
let subscription = sink.run(_source)
return (sink: sink, subscription: subscription)
}
}
核心实现仍然在Sink
之中
fileprivate final class FlatMapSink<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType> : MergeSink<SourceElement, SourceSequence, Observer> where Observer.E == SourceSequence.E {
typealias Selector = (SourceElement) throws -> SourceSequence
private let _selector: Selector
init(selector: @escaping Selector, observer: Observer, cancel: Cancelable) {
_selector = selector
super.init(observer: observer, cancel: cancel)
}
override func performMap(_ element: SourceElement) throws -> SourceSequence {
return try _selector(element)
}
}
fileprivate final class FlatMapSink<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType> : MergeSink<SourceElement, SourceSequence, Observer> where Observer.E == SourceSequence.E {
typealias Selector = (SourceElement) throws -> SourceSequence
private let _selector: Selector
init(selector: @escaping Selector, observer: Observer, cancel: Cancelable) {
_selector = selector
super.init(observer: observer, cancel: cancel)
}
override func performMap(_ element: SourceElement) throws -> SourceSequence {
return try _selector(element)
}
}
FlatMapSink
继承于MergeSink
, 那么 FlatMap
操作只是个皮, Merge
操作才是骨,
Original Observable
每次产生的Next
事件, 都会被_selector
转换成一个新的Observable
, 然后由 MergeSinkIter
订阅该 Observable
, 最后传递给 FlatMapSink
。 简单验证一下.
func testFlatMap() {
let disposeBag = DisposeBag()
let aPublishSubject = PublishSubject<String>()
let bPublishSubject = PublishSubject<String>()
let cPublishSubject = PublishSubject<String>()
aPublishSubject.asObservable()
.flatMap { $0 == "1" ? bPublishSubject : cPublishSubject }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
aPublishSubject.onNext("1")
aPublishSubject.onNext("2")
bPublishSubject.onNext("b")
cPublishSubject.onNext("c")
}
// out put log
b
c
简析:在aPublishSubject onNext(1) 和 onNext(2) 之后,那么 实际就成功订阅了bPublishSubject和cPublishSubject, 这两个PublishSubject发出的next事件最终都会被用户订阅到。所以flatMap
不仅仅是将事件转换为新的Observable
, 而且还隐含着Merge
操作。
flatMapLatest
代码示例就不写了,只需要简单的将 flatMap
替换成 flatMapLatest
即可
public func flatMapLatest<O: ObservableConvertibleType>(_ selector: @escaping (E) throws -> O)
-> Observable<O.E> {
return FlatMapLatest(source: asObservable(), selector: selector)
}
// FlatMapLatest
final fileprivate class FlatMapLatest<SourceType, S: ObservableConvertibleType> : Producer<S.E> {
typealias Selector = (SourceType) throws -> S
fileprivate let _source: Observable<SourceType>
fileprivate let _selector: Selector
init(source: Observable<SourceType>, selector: @escaping Selector) {
_source = source
_selector = selector
}
override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == S.E {
let sink = MapSwitchSink<SourceType, S, O>(selector: _selector, observer: observer, cancel: cancel)
let subscription = sink.run(_source)
return (sink: sink, subscription: subscription)
}
}
// MapSwitchSink
final fileprivate class MapSwitchSink<SourceType, S: ObservableConvertibleType, O: ObserverType> : SwitchSink<SourceType, S, O> where O.E == S.E {
typealias Selector = (SourceType) throws -> S
fileprivate let _selector: Selector
init(selector: @escaping Selector, observer: O, cancel: Cancelable) {
_selector = selector
super.init(observer: observer, cancel: cancel)
}
override func performMap(_ element: SourceType) throws -> S {
return try _selector(element)
}
}
MapSwitchSink
继承于 SwitchSink
, 与 SwitchIdentitySink
唯一不一样的地方在于performMap
。 所以 FlatMapLatest
其实和 SwitchLatest
是一样的机制, 永远只订阅最新的Observable
网友评论