今天开始一起分析一些常用的高阶函数,今天先分析combineLatest。有时候我们需要同时监听多个响应,或者合并多个响应时,就可以用combineLatest。
我们先看下面的例子,在登录界面,只有同时输入了账号和密码才能点击登录:
let userNameVaild = userNameTf.rx.text.orEmpty
.map { (text) -> Bool in
return text.count > 0
}
let passwordVaild = passwordTf.rx.text.orEmpty
.map { (text) -> Bool in
return text.count > 0
}
Observable.combineLatest(userNameVaild,passwordVaild) { $0 && $1 }
.bind(to: loginBtn.rx.isEnabled)
.disposed(by: disposeBag)
没有密码不能点击
没有账号不能点击
可以点击
- 前面只是创建序列,我们直接来看
combineLatest
,返回CombineLatest2
序列(CombineLatest2
继承了Producer
):
extension ObservableType {
public static func combineLatest<O1: ObservableType, O2: ObservableType>
(_ source1: O1, _ source2: O2, resultSelector: @escaping (O1.E, O2.E) throws -> E)
-> Observable<E> {
return CombineLatest2(
source1: source1.asObservable(), source2: source2.asObservable(),
resultSelector: resultSelector
)
}
}
final class CombineLatest2<E1, E2, R> : Producer<R> {
...
init(source1: Observable<E1>, source2: Observable<E2>, resultSelector: @escaping ResultSelector) {
self._source1 = source1 //保存两个源序列
self._source2 = source2
self._resultSelector = resultSelector
}
override func run<O: ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == R {
let sink = CombineLatestSink2_(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
- 根据之前说的RxSwift核心逻辑和bind函数,
CombineLatest2
序列订阅后便来到run
函数。
- 在
CombineLatest2
的run
函数里创建CombineLatestSink2_
(业务下沉,CombineLatestSink2_
继承了CombineLatestSink
),调用了非常熟悉的sink.run()
,我们马上进入CombineLatestSink2_
里看看:
final class CombineLatestSink2_<E1, E2, O: ObserverType> : CombineLatestSink<O> {
...
func run() -> Disposable {
let subscription1 = SingleAssignmentDisposable()
let subscription2 = SingleAssignmentDisposable()
//分别创建观察者
let observer1 = CombineLatestObserver(lock: self._lock, parent: self, index: 0, setLatestValue: { (e: E1) -> Void in self._latestElement1 = e }, this: subscription1)
let observer2 = CombineLatestObserver(lock: self._lock, parent: self, index: 1, setLatestValue: { (e: E2) -> Void in self._latestElement2 = e }, this: subscription2)
//先订阅再setDisposable
subscription1.setDisposable(self._parent._source1.subscribe(observer1))
subscription2.setDisposable(self._parent._source2.subscribe(observer2))
return Disposables.create([
subscription1,
subscription2
])
}
}
-
CombineLatestSink2_
创建了两个CombineLatestObserver
观察者分别用源序列进行订阅,根据RxSwift核心逻辑,最终来到CombineLatestObserver
的on
函数里:
final class CombineLatestObserver<ElementType>
: ObserverType
, LockOwnerType
, SynchronizedOnType {
...
init(lock: RecursiveLock, parent: CombineLatestProtocol, index: Int, setLatestValue: @escaping ValueSetter, this: Disposable) {
self._lock = lock
self._parent = parent //保存CombineLatestSink2_
self._index = index
self._this = this
self._setLatestValue = setLatestValue
}
func on(_ event: Event<Element>) {
self.synchronizedOn(event)
}
func _synchronized_on(_ event: Event<Element>) {
switch event {
case .next(let value):
self._setLatestValue(value)
self._parent.next(self._index)
case .error(let error):
self._this.dispose()
self._parent.fail(error)
case .completed:
self._this.dispose()
self._parent.done(self._index)
}
}
}
extension SynchronizedOnType {
func synchronizedOn(_ event: Event<E>) {
self.lock(); defer { self.unlock() }
self._synchronized_on(event)
}
}
CombineLatestObserver.on
调用后,最终调用了self._parent.next(self._index)
,self._parent
就是初始化保存下来的CombineLatestSink2_
。
-
CombineLatestSink2_
没有实现具体的方法,我们看父类CombineLatestSink
:
class CombineLatestSink<O: ObserverType>
: Sink<O>
, CombineLatestProtocol {
...
func next(_ index: Int) {
if !self._hasValue[index] {
self._hasValue[index] = true
self._numberOfValues += 1 //标记
}
//判断是否能够响应
if self._numberOfValues == self._arity {
do {
let result = try self.getResult()
self.forwardOn(.next(result))
}
catch let e {
self.forwardOn(.error(e))
self.dispose()
}
}
else {
var allOthersDone = true
for i in 0 ..< self._arity {
if i != index && !self._isDone[i] {
allOthersDone = false
break
}
}
if allOthersDone {
self.forwardOn(.completed)
self.dispose()
}
}
}
- 我们看到了
self.forwardOn(.next())
发送响应,非常熟悉,但是在这之前还有一些步骤,就是标记和判断,还有获取结果:
if !self._hasValue[index] {
self._hasValue[index] = true
self._numberOfValues += 1 //标记
}
//判断是否能够响应
if self._numberOfValues == self._arity {
...
}
else {
...
}
首先将序列进行标记self._numberOfValues += 1
,当两个序列或n个序列都进行了订阅时,self._numberOfValues
等于self._arity
,便可执行下一步self.getResult()
获取结果。
-
self.getResult()
里面调用了self._parent._resultSelector
:
final class CombineLatestSink2_<E1, E2, O: ObserverType> : CombineLatestSink<O> {
...
override func getResult() throws -> R {
return try self._parent._resultSelector(self._latestElement1, self._latestElement2)
}
}
self._parent._resultSelector
是一开始就保存的外面的闭包,然后返回一个值:
Observable.combineLatest(userNameVaild,passwordVaild) { $0 && $1 }
- 最后才调用了
self.forwardOn(.next(result))
发送响应,实现通过两个序列来控制loginBtn是否能够点击。
网友评论