combineLatest的使用
let stringSub = PublishSubject<String>()
let intSub = PublishSubject<Int>()
Observable.combineLatest(stringSub, intSub) { strElement, intElement in
"\(strElement) \(intElement)"
}
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
stringSub.onNext("J")
intSub.onNext(1)
intSub.onNext(2)
stringSub.onNext("E")
stringSub.onNext("JENSEN")
combineLatest序列创建
public static func combineLatest<O1: ObservableType, O2: ObservableType>
(_ source1: O1, _ source2: O2, resultSelector: @escaping (O1.Element, O2.Element) throws -> Element)
-> Observable<Element> {
return CombineLatest2(
source1: source1.asObservable(), source2: source2.asObservable(),
resultSelector: resultSelector
)
}
combineLatest
实际返回的是CombineLatest2
序列
进入CombineLatest2
序列
init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: @escaping ResultSelector) {
self._source1 = source1
self._source2 = source2
self._resultSelector = resultSelector
}
保存_source1
和_source2
,已经外界的闭包_resultSelector
combineLatest序列订阅
CombineLatest2
被订阅时,将会来到CombineLatest2
下run
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Result {
let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
1.CombineLatestSink2_
保存_parent(CombineLatest2)
,观察者observer
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self._parent = parent
super.init(arity: 2, observer: observer, cancel: cancel)
}
2.调用sink.run()
func run() -> Disposable {
let subscription1 = SingleAssignmentDisposable()
let subscription2 = SingleAssignmentDisposable()
let observer1 = CombineLatestObserver(lock: self._lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: self._lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
subscription1.setDisposable(self._parent._source1.subscribe(observer1))
subscription2.setDisposable(self._parent._source2.subscribe(observer2))
return Disposables.create([
subscription1,
subscription2
])
}
(1).创建类型CombineLatestObserver
观察者observer1
,observer2
.保存参数_lock
,_parent
,_index
,_this
,_setLatestValue
_parent
是CombineLatestSink2_
,_index
是观察者观察的序列的索引值,_setLatestValue
是闭包{ (e: E2) -> Void in self._latestElement1 = e }
,存储该序列最后一次发送响应的值到对应的属性。
(2).self._parent._source1.subscribe(observer1)
,self._parent._source2.subscribe(observer2)
,分别对_source1
和_source2
进行订阅。
对源序列_source1
,_source2
发送响应
当对_source1
发送响应,调用到CombineLatestObserver.on
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
调用SynchronizedOnType.synchronizedOn
extension SynchronizedOnType {
func synchronizedOn(_ event: Event<Element>) {
self.lock(); defer { self.unlock() }
self._synchronized_on(event)
}
}
调用CombineLatestObserver._synchronized_on
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next(let value):
self._setLatestValue(value)
self._parent.next(self._index)
case .error(let error):
self._this.dispose()
self._parent.fail(error)
case .completed:
self._this.dispose()
self._parent.done(self._index)
}
}
_synchronized_on
首先调用_setLatestValue
保存序列最后一次响应的值
调用CombineLatestSink.next(self_index)
func next(_ index: Int) {
if !self._hasValue[index] {
self._hasValue[index] = true
self._numberOfValues += 1
}
if self._numberOfValues == self._arity {
do {
let result = try self.getResult()
self.forwardOn(.next(result))
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
}
else {
var allOthersDone = true
for i in 0 ..< self._arity {
if i != index && !self._isDone[i] {
allOthersDone = false
break
}
}
if allOthersDone {
self.forwardOn(.completed)
self.dispose()
}
}
1.首先判断self._hasValue[index]
判断改序列之前是否响应过,未响应过将标志设置为已响应self._hasValue[index] = true
,并且将已响应序列的数量+1self._numberOfValues += 1
2.self._numberOfValues == self._arity
判断是否已全部响应
如果已经全部响应:
override func getResult() throws-> Result {
return try self._parent._resultSelector(self._latestElement1, self._latestElement2)
}
(1)获取调用外界combineLatest
时,保存的_resultSelector
闭包,并调用,将结果保存至result
(2)Sink.forwardOn(.next(result))
-->AnonymousObservableSink.on(.next(result))
,最后调用到外界的闭包中。
这里用到了一个算法:就是判断所有序列是不是已经全部响应过,这个算法可以学习一下。
网友评论