在Rxswift官方demo,class GithubSignupViewModel1中有一处发送网络请求并追踪请求状态的代码如下
let signingIn = ActivityIndicator()
self.signingIn = signingIn.asObservable()
let usernameAndPassword = Observable.combineLatest(input.username, input.password) { (username: $0, password: $1) }
signedIn = input.loginTaps.withLatestFrom(usernameAndPassword)
.flatMapLatest { pair in
return API.signup(pair.username, password: pair.password)
.observe(on:MainScheduler.instance)
.catchAndReturn(false)
// 1. 调用追踪,追踪API.signup,并把追踪信号放到signingIn
.trackActivity(signingIn)
}
.flatMapLatest { loggedIn -> Observable<Bool> in
let message = loggedIn ? "Mock: Signed in to GitHub." : "Mock: Sign in to GitHub failed"
return wireframe.promptFor(message, cancelAction: "OK", actions: [])
// propagate original value
.map { _ in
loggedIn
}
}
.share(replay: 1)
ActivityIndicator源码如下
import RxSwift
import RxCocoa
private struct ActivityToken<E> : ObservableConvertibleType, Disposable {
private let _source: Observable<E>
private let _dispose: Cancelable
init(source: Observable<E>, disposeAction: @escaping () -> Void) {
// 4. 设置source,设置Disposables,在资源回收的时候调用disposeAction
// 也即上一步传入的self.decrement
_source = source
_dispose = Disposables.create(with: disposeAction)
}
func dispose() {
_dispose.dispose()
}
func asObservable() -> Observable<E> {
// 12.ActivityToken<Bool>返回的也即是传入的source本身,也即
// Observable<Bool>,也即外面传入的API.Signup
_source
}
}
/**
Enables monitoring of sequence computation.
If there is at least one sequence computation in progress, `true` will be sent.
When all activities complete `false` will be sent.
*/
public class ActivityIndicator : SharedSequenceConvertibleType {
public typealias Element = Bool
public typealias SharingStrategy = DriverSharingStrategy
private let _lock = NSRecursiveLock()
private let _relay = BehaviorRelay(value: 0)
private let _loading: SharedSequence<SharingStrategy, Bool>
public init() {
// 7.relay.value+1 经过map转换为Bool类型,使_loading的值变为true
_loading = _relay.asDriver()
.map { $0 > 0 }
.distinctUntilChanged()
}
fileprivate func trackActivityOfObservable<Source: ObservableConvertibleType>(_ source: Source) -> Observable<Source.Element> {
// 3. 调用using operator,创建一个ActivityToken Source,并把API.Signup
和 self.decrement传入作为初始化参数
return Observable.using({ () -> ActivityToken<Source.Element> in
// 5. 构建Token Source的工厂函数,内部调用self.increment()
self.increment()
return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
}) { t in
// 10. 通过5创建的Token Source创建一个新的observerable
// 这里t的类型是ActivityToken<Bool>
return t.asObservable()
}
}
private func increment() {
// 6. increment让_relay接收元素_relay.value+1
_lock.lock()
_relay.accept(_relay.value + 1)
_lock.unlock()
}
private func decrement() {
_lock.lock()
_relay.accept(_relay.value - 1)
_lock.unlock()
}
public func asSharedSequence() -> SharedSequence<SharingStrategy, Element> {
// 9.外界subscribe ActivityIndicator,实际是在subscribe _loading,也就是外
// 面的signingIn
_loading
}
}
extension ObservableConvertibleType {
public func trackActivity(_ activityIndicator: ActivityIndicator) -> Observable<Element> {
// 2. indicator追踪传入的信号API.signup
activityIndicator.trackActivityOfObservable(self)
// 13. return的是ActivityIndicator调用trackActivityOfObservable函数返回的
// Observable<Bool>类型,也即通过Observable.using创建出来的
}
}
USing
extension ObservableType {
/**
Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime.
- seealso: [using operator on reactivex.io](http://reactivex.io/documentation/operators/using.html)
- parameter resourceFactory: Factory function to obtain a resource object.
- parameter observableFactory: Factory function to obtain an observable sequence that depends on the obtained resource.
- returns: An observable sequence whose lifetime controls the lifetime of the dependent resource object.
*/
public static func using<Resource: Disposable>(_ resourceFactory: @escaping () throws -> Resource, observableFactory: @escaping (Resource) throws -> Observable<Element>) -> Observable<Element> {
Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
}
}
using操作符按照Reactivex的解释如下
它首先通过第一个λ参数resourceFactory创建了一个resource,然后又把resource传给第二个λ参数observableFactory创建了一个Observable<Element>,最后再调用Using构造函数构造了一个新的Observable<Element>,这个Observable牢牢控制着resource的生命期,
一旦被返回的Observable发出complete或者error事件,resource的生命期也会终止,换言之,resource会调用自己的dispose()事件。
Using继承自Producer<SourceType>
class Producer<Element>: Observable<Element> {
override init() {
super.init()
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
rxAbstractMethod()
}
}
Producer在subscribe的时候,会根据当先调度器创建disposer,并调用子类的
run函数,回到子类的run函数
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
// 构建sink
let sink = UsingSink(parent: self, observer: observer, cancel: cancel)
// 调用sink.run
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
也即是
final private class UsingSink<ResourceType: Disposable, Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias SourceType = Observer.Element
typealias Parent = Using<SourceType, ResourceType>
private let parent: Parent
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
var disposable = Disposables.create()
do {
// 1.拿到resourceFactory创建resource,也即ActivityToken<E>
let resource = try self.parent.resourceFactory()
disposable = resource
// 2. 传入token,拿到新的source observable,也即
// func asObservable() -> Observable<E> {
// // 最开始的API.signup
// _source
// }
let source = try self.parent.observableFactory(resource)
return Disposables.create(
// source的订阅换成了前面返回的Using
source.subscribe(self),
disposable
)
} catch let error {
return Disposables.create(
Observable.error(error).subscribe(self),
disposable
)
}
}
func on(_ event: Event<SourceType>) {
// 把sourceObservable发送的元素转发一遍
switch event {
case let .next(value):
self.forwardOn(.next(value))
case let .error(error):
self.forwardOn(.error(error))
self.dispose()
case .completed:
self.forwardOn(.completed)
self.dispose()
}
}
}
至此,我们明白了,observabel.using返回的Observabel<Bool>和API.signup发射的元素一致。
而API.signup在发出complete事件后(注:事件流见附日志),即代表这个observable序列终止,经由它创建的ActivityToken生命期也随之终止,调用了自身的dispose()函数,也顺带调用前一步传入的self.decrement,也即
private func decrement() {
_lock.lock()
_relay.accept(_relay.value - 1)
_lock.unlock()
}
而此时_relay.value变成了0,_loading也变成了false
_loading = _relay.asDriver()
.map { $0 > 0 }
.distinctUntilChanged()
而_loading作为signingIn的asSharedSequence(),也即是自身的observable
详见
import RxSwift
/**
Trait that represents observable sequence that shares computation resources with following properties:
- it never fails
- it delivers events on `SharingStrategy.scheduler`
- sharing strategy is customizable using `SharingStrategy.share` behavior
`SharedSequence<Element>` can be considered a builder pattern for observable sequences that share computation resources.
To find out more about units and how to use them, please visit `Documentation/Traits.md`.
*/
public struct SharedSequence<SharingStrategy: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType, ObservableConvertibleType {
let source: Observable<Element>
init(_ source: Observable<Element>) {
self.source = SharingStrategy.share(source)
}
init(raw: Observable<Element>) {
self.source = raw
}
#if EXPANDABLE_SHARED_SEQUENCE
/**
This method is extension hook in case this unit needs to extended from outside the library.
By defining `EXPANDABLE_SHARED_SEQUENCE` one agrees that it's up to them to ensure shared sequence
properties are preserved after extension.
*/
public static func createUnsafe<Source: ObservableType>(source: Source) -> SharedSequence<SharingStrategy, Source.Element> {
SharedSequence<SharingStrategy, Source.Element>(raw: source.asObservable())
}
#endif
/**
- returns: Built observable sequence.
*/
public func asObservable() -> Observable<Element> {
self.source
}
/**
- returns: `self`
*/
public func asSharedSequence() -> SharedSequence<SharingStrategy, Element> {
self
}
}
完成了_loading对当前追踪的计算序列的追踪。
值得肯定的是官方对ActivityIndicator的设计并没有简单的设计为追踪一个流的computation,而是通过map ($0 > 0)这个写法,让它有了追踪多个Observable序列的能力,每追踪多一个则_relay.accept(_relay.value + 1)
,一个序列终止则_relay.accept(_relay.value - 1)
,直到减为0,_loading才会由true变为false。
非常类似自动引用计数(Automatic Reference Counting)的思想,值得我们借鉴和反复琢磨。
附:
// 追踪Observabel debug日志
2022-03-27 11:06:07.371: signup -> subscribed
2022-03-27 11:06:08.386: signup -> Event next(true)
2022-03-27 11:06:08.425: signup -> Event completed
2022-03-27 11:06:08.425: signup -> isDisposed
网友评论