美文网首页RxSwift
关于Rxswift demo中的ActivityIndicato

关于Rxswift demo中的ActivityIndicato

作者: 令狐冲_ | 来源:发表于2022-03-27 11:10 被阅读0次

    在Rxswift官方demo,class GithubSignupViewModel1中有一处发送网络请求并追踪请求状态的代码如下

            let signingIn = ActivityIndicator()
            self.signingIn = signingIn.asObservable()
    
            let usernameAndPassword = Observable.combineLatest(input.username, input.password) { (username: $0, password: $1) }
    
            signedIn = input.loginTaps.withLatestFrom(usernameAndPassword)
                .flatMapLatest { pair in
                    return API.signup(pair.username, password: pair.password)
                        .observe(on:MainScheduler.instance)
                        .catchAndReturn(false)
                        // 1. 调用追踪,追踪API.signup,并把追踪信号放到signingIn
                        .trackActivity(signingIn)
                }
                .flatMapLatest { loggedIn -> Observable<Bool> in
                    let message = loggedIn ? "Mock: Signed in to GitHub." : "Mock: Sign in to GitHub failed"
                    return wireframe.promptFor(message, cancelAction: "OK", actions: [])
                        // propagate original value
                        .map { _ in
                            loggedIn
                        }
                }
                .share(replay: 1)
    

    ActivityIndicator源码如下

    import RxSwift
    import RxCocoa
    
    private struct ActivityToken<E> : ObservableConvertibleType, Disposable {
        private let _source: Observable<E>
        private let _dispose: Cancelable
    
        init(source: Observable<E>, disposeAction: @escaping () -> Void) {
           // 4. 设置source,设置Disposables,在资源回收的时候调用disposeAction
          // 也即上一步传入的self.decrement
            _source = source
            _dispose = Disposables.create(with: disposeAction)
        }
    
        func dispose() {
            _dispose.dispose()
        }
    
        func asObservable() -> Observable<E> {
            // 12.ActivityToken<Bool>返回的也即是传入的source本身,也即 
           //  Observable<Bool>,也即外面传入的API.Signup
            _source
        }
    }
    
    /**
    Enables monitoring of sequence computation.
    
    If there is at least one sequence computation in progress, `true` will be sent.
    When all activities complete `false` will be sent.
    */
    public class ActivityIndicator : SharedSequenceConvertibleType {
        public typealias Element = Bool
        public typealias SharingStrategy = DriverSharingStrategy
    
        private let _lock = NSRecursiveLock()
        private let _relay = BehaviorRelay(value: 0)
        private let _loading: SharedSequence<SharingStrategy, Bool>
    
        public init() {
          // 7.relay.value+1 经过map转换为Bool类型,使_loading的值变为true
            _loading = _relay.asDriver()
                .map { $0 > 0 }
                .distinctUntilChanged()
        }
    
        fileprivate func trackActivityOfObservable<Source: ObservableConvertibleType>(_ source: Source) -> Observable<Source.Element> {
            // 3. 调用using operator,创建一个ActivityToken Source,并把API.Signup
    和 self.decrement传入作为初始化参数
            return Observable.using({ () -> ActivityToken<Source.Element> in
                // 5. 构建Token Source的工厂函数,内部调用self.increment()
                self.increment()
                return ActivityToken(source: source.asObservable(), disposeAction: self.decrement)
            }) { t in
                // 10. 通过5创建的Token Source创建一个新的observerable
                //  这里t的类型是ActivityToken<Bool>
                return t.asObservable()
            }
        }
    
        private func increment() {
          // 6. increment让_relay接收元素_relay.value+1
            _lock.lock()
            _relay.accept(_relay.value + 1)
            _lock.unlock()
        }
    
        private func decrement() {
            _lock.lock()
            _relay.accept(_relay.value - 1)
            _lock.unlock()
        }
    
        public func asSharedSequence() -> SharedSequence<SharingStrategy, Element> {
          // 9.外界subscribe ActivityIndicator,实际是在subscribe _loading,也就是外        
         //   面的signingIn
            _loading
        }
    }
    
    extension ObservableConvertibleType {
        public func trackActivity(_ activityIndicator: ActivityIndicator) -> Observable<Element> {
            // 2. indicator追踪传入的信号API.signup
            activityIndicator.trackActivityOfObservable(self)
          // 13. return的是ActivityIndicator调用trackActivityOfObservable函数返回的
          // Observable<Bool>类型,也即通过Observable.using创建出来的
        }
    }
    
    

    USing

    extension ObservableType {
        /**
         Constructs an observable sequence that depends on a resource object, whose lifetime is tied to the resulting observable sequence's lifetime.
    
         - seealso: [using operator on reactivex.io](http://reactivex.io/documentation/operators/using.html)
    
         - parameter resourceFactory: Factory function to obtain a resource object.
         - parameter observableFactory: Factory function to obtain an observable sequence that depends on the obtained resource.
         - returns: An observable sequence whose lifetime controls the lifetime of the dependent resource object.
         */
        public static func using<Resource: Disposable>(_ resourceFactory: @escaping () throws -> Resource, observableFactory: @escaping (Resource) throws -> Observable<Element>) -> Observable<Element> {
            Using(resourceFactory: resourceFactory, observableFactory: observableFactory)
        }
    }
    

    using操作符按照Reactivex的解释如下

    image.png
    它首先通过第一个λ参数resourceFactory创建了一个resource,然后又把resource传给第二个λ参数observableFactory创建了一个Observable<Element>,最后再调用Using构造函数构造了一个新的Observable<Element>,这个Observable牢牢控制着resource的生命期,
    一旦被返回的Observable发出complete或者error事件,resource的生命期也会终止,换言之,resource会调用自己的dispose()事件。
    Using继承自Producer<SourceType>
    
    class Producer<Element>: Observable<Element> {
        override init() {
            super.init()
        }
    
        override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // The returned disposable needs to release all references once it was disposed.
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
    
        func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            rxAbstractMethod()
        }
    }
    

    Producer在subscribe的时候,会根据当先调度器创建disposer,并调用子类的
    run函数,回到子类的run函数

        override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            // 构建sink
            let sink = UsingSink(parent: self, observer: observer, cancel: cancel)
            // 调用sink.run
            let subscription = sink.run()
            return (sink: sink, subscription: subscription)
        }
    

    也即是

    final private class UsingSink<ResourceType: Disposable, Observer: ObserverType>: Sink<Observer>, ObserverType {
        typealias SourceType = Observer.Element 
        typealias Parent = Using<SourceType, ResourceType>
    
        private let parent: Parent
        
        init(parent: Parent, observer: Observer, cancel: Cancelable) {
            self.parent = parent
            super.init(observer: observer, cancel: cancel)
        }
        
        func run() -> Disposable {
            var disposable = Disposables.create()
            
            do {
                // 1.拿到resourceFactory创建resource,也即ActivityToken<E>
                let resource = try self.parent.resourceFactory()
                disposable = resource
                // 2. 传入token,拿到新的source observable,也即
                //     func asObservable() -> Observable<E> {
                //        //  最开始的API.signup
                //        _source
                //      }
                let source = try self.parent.observableFactory(resource)
                
                return Disposables.create(
                    // source的订阅换成了前面返回的Using
                    source.subscribe(self),
                    disposable
                )
            } catch let error {
                return Disposables.create(
                    Observable.error(error).subscribe(self),
                    disposable
                )
            }
        }
        
        func on(_ event: Event<SourceType>) {
            // 把sourceObservable发送的元素转发一遍
            switch event {
            case let .next(value):
                self.forwardOn(.next(value))
            case let .error(error):
                self.forwardOn(.error(error))
                self.dispose()
            case .completed:
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
    

    至此,我们明白了,observabel.using返回的Observabel<Bool>和API.signup发射的元素一致。
    而API.signup在发出complete事件后(注:事件流见附日志),即代表这个observable序列终止,经由它创建的ActivityToken生命期也随之终止,调用了自身的dispose()函数,也顺带调用前一步传入的self.decrement,也即

        private func decrement() {
            _lock.lock()
            _relay.accept(_relay.value - 1)
            _lock.unlock()
        }
    

    而此时_relay.value变成了0,_loading也变成了false

            _loading = _relay.asDriver()
                .map { $0 > 0 }
                .distinctUntilChanged()
    

    而_loading作为signingIn的asSharedSequence(),也即是自身的observable
    详见

    import RxSwift
    
    /**
        Trait that represents observable sequence that shares computation resources with following properties:
    
        - it never fails
        - it delivers events on `SharingStrategy.scheduler`
        - sharing strategy is customizable using `SharingStrategy.share` behavior
    
        `SharedSequence<Element>` can be considered a builder pattern for observable sequences that share computation resources.
    
        To find out more about units and how to use them, please visit `Documentation/Traits.md`.
    */
    public struct SharedSequence<SharingStrategy: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType, ObservableConvertibleType {
        let source: Observable<Element>
    
        init(_ source: Observable<Element>) {
            self.source = SharingStrategy.share(source)
        }
    
        init(raw: Observable<Element>) {
            self.source = raw
        }
    
        #if EXPANDABLE_SHARED_SEQUENCE
        /**
         This method is extension hook in case this unit needs to extended from outside the library.
         
         By defining `EXPANDABLE_SHARED_SEQUENCE` one agrees that it's up to them to ensure shared sequence
         properties are preserved after extension.
        */
        public static func createUnsafe<Source: ObservableType>(source: Source) -> SharedSequence<SharingStrategy, Source.Element> {
            SharedSequence<SharingStrategy, Source.Element>(raw: source.asObservable())
        }
        #endif
    
        /**
        - returns: Built observable sequence.
        */
        public func asObservable() -> Observable<Element> {
            self.source
        }
    
        /**
        - returns: `self`
        */
        public func asSharedSequence() -> SharedSequence<SharingStrategy, Element> {
            self
        }
    }
    
    

    完成了_loading对当前追踪的计算序列的追踪。
    值得肯定的是官方对ActivityIndicator的设计并没有简单的设计为追踪一个流的computation,而是通过map ($0 > 0)这个写法,让它有了追踪多个Observable序列的能力,每追踪多一个则_relay.accept(_relay.value + 1),一个序列终止则_relay.accept(_relay.value - 1),直到减为0,_loading才会由true变为false。
    非常类似自动引用计数(Automatic Reference Counting)的思想,值得我们借鉴和反复琢磨。

    附:

    // 追踪Observabel debug日志
    2022-03-27 11:06:07.371: signup -> subscribed
    2022-03-27 11:06:08.386: signup -> Event next(true)
    2022-03-27 11:06:08.425: signup -> Event completed
    2022-03-27 11:06:08.425: signup -> isDisposed
    

    本文参考资料:

    operators-using
    ActivityIndicator

    相关文章

      网友评论

        本文标题:关于Rxswift demo中的ActivityIndicato

        本文链接:https://www.haomeiwen.com/subject/jpysjrtx.html